분산처리시스템 (1)

💻 Programming

[AWS] SQS와 Lambda를 이용하여 DLQ 처리하기

안녕하세요, 오랜만에 글을 씁니다.

 

오늘 공유드리는 내용은 분산시스템에서 DLQ 처리하는 방법에 대한 것입니다.

제가 관리하는 시스템 중 하나가 주로 카프카 메시지를 컨숨하여 처리하는 일을 하는 시스템입니다.

여러 개의 인스턴스가 존재하고 모두 동일한 소스코드가 배포되어있죠.

소스코드에는 JPA를 이용한 CRUD 로직이 들어있습니다.

 

문제가 없던 시스템인데 관련부서에서 파티션 개수를 늘리면서 문제가 생겼습니다.

단순히 파티션 개수가 늘어난게 문제가 아니라 그쪽에서 보내는 카프카 메시지의 특성과도 관련이 있었죠.

파티션마다 서로 다른 인스턴스에서 처리하지만 각 메시지에는 서로 중복된 데이터의 내용이 들어가 있는 형태였고 (이건 뭐 제가 어찌 할 수 없는 것이죠), 동일한 데이터를 서로 다른 인스턴스에서 JPA를 이용해서 처리하다가 드물게 ObjectOptimisticLockingFailureException 이 발생하기 시작했습니다. 

예를들어 인스턴스 1번에서 {A, B} 라는 메시지를 처리할 때 인스턴스 2번에서 {A,C}, 3번 인스턴스에서 {A,D} 메시지를 처리하면서 A 데이터를 3대의 인스턴스에서 동시에 처리하려고 한거죠. 세 개의 메시지는 서로 다른 데이터도 들고있으므로 무조건 처리를 해줘야 하는 것들이었습니다.

이걸 어떻게 할 까 고민하다가 SQS를 DLQ를 이용하여 Lambda 와 연결시켜 일정 시간이 지난 뒤에 수동동기화 API를 호출하도록 구성하기로 했습니다.

이렇게 한 이유는 가장 빠른시간에 추가적인 소스구현 없이 효과적으로 목적을 달성할 수 있었기 때문입니다.

만약 DLQ로 들어오는 메시지가 많아진다면 소스코드를 구현하는게 비용측면에선 더 효율적입니다.

인스턴스는 어차피 떠있어야 하는거고 람다를 제거할 수 있으니 람다에서 발생하는 비용이 줄어듭니다.

하지만 소스코드 구현시 고려해야할 점이 있죠. 인스턴스가 여러개 떠있지만 하나의 인스턴스에서만 DLQ의 메시지를 처리하도록 해야한다는 거죠. 그렇지 않으면 또 처리에 실패하는 경우가 발생할 테니까요. 그리고 모든 인스턴스에 동일한 소스코드가 배포되어야 하는데 하나의 인스턴스에서만 실행이 되도록 하려면 또 이런저런 코드를 작성해야 하다보니 람다를 이용하기로 했습니다.

 

아무튼, SQS를 DLQ로 이용하고, Lambda 함수를 연결시켜 일정 시간뒤에 수동API를 호출하여 실패했던 메시지에 대한 정보를 읽어와서 처리하도록 했습니다.

SQS Lambda Trigger 설정

이렇게 설정해놓으면 SQS 메시지가 발행될 때마다 Lambda 함수를 실행하는데 Lambda 함수에서는 API를 호출하도록 구성했습니다. 람다 트리거에 대한 내용을 AWS에서는 다음과 같이 설명하고 있습니다.

You can configure your Amazon SQS queue to trigger an AWS Lambda function when new messages arrive in the queue.
Your queue and Lambda function must be in the same AWS Region.
A Lambda function can use at most one queue as an event source. You can use the same queue with multiple Lambda functions.
You can't associate an encrypted queue that uses the AWS-managed CMK for Amazon SQS with a Lambda function in a different AWS account.

저의 경우에는 필요없긴하지만, 하나의 SQS에 여러개의 람다함수를 연결하여 사용할 수 있다는 점이 좋아보이네요.

 

자, 그리고 SQS와 연결한 Lambda 함수는 다음과 같습니다.

import logging
import json
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

API_URL = "https://company.com/employees/{}"

def lambda_handler(event, context):

    message = json.loads(event['Records'][0]['body'])
    
    url = API_URL.format(message['employeeId'])
    headers = {'Content-type':'application/json'}
    
    req = Request(url, headers=headers, data=b'', method='PUT')
    
    try:
        response = urlopen(req)
        return response.getcode()
    except HTTPError as e:
        logger.error("Request failed: %d %s, sqsBody:%s", e.code, e.reason, message)
        return e.code
    except URLError as e:
        logger.error("Server connection failed: %s, sqsBody:%s", e.reason, message)
    return None

Python은 공부한적 없으나, 자바는 컴파일언어라 코드를 직접 AWS콘솔에서 작성할 수 없어서 python으로 작성했습니다.

 

내용은 별거 없고 event에 SQS 메시지의 내용이 들어오기 때문에 event에서 메시지의 body 내용을 추출하여 message 변수에 넣고 이것을 API 호출할 때 사용한것이 다입니다. 여기서는 employeeId 값이 SQS 메시지에 {"employeeId":1234} 형태로 들어있었다면, 저기서 1234 값을 추출하여 api 호출할때 path variable로 넣어서 https://company.com/employees/1234 를 호출하도록 한거죠. 

 

위 코드상에선 에러가 발생하면 그냥 로깅하거나 에러코드 리턴하도록 해놓았는데, 에러건이 많이 발생한다면 슬랙알람을 보내도록 하는 것도 좋을 것 같습니다.

 

 

이상으로 SQS와 Lambda를 이용한 분산처리시스템에서의 DLQ 처리에 대한 기록을 마칩니다.