sqs (2)

💻 Programming

[AWS/SQS] cloudwatch datapoint 조회하기

안녕하세요, 이번엔 이벤트드리븐 서비스의 개선작업을 하다가 알게된 cloudwatch 통계지표 조회방법을 공유드립니다.

이 작업을 하게된 이유를 말씀드리자면 이렇습니다.

현재 SQS 를 이용한 이벤트드리븐 환경에서 동작하는 서비스가 있습니다. 

이런 저런 정보들을 동기화하기 위한 목적으로 사용하고 있고, 이 서비스는 멀티쓰레드 환경에서 동작하도록 되어있습니다.

혹시라도 이벤트가 지연될 경우를 대비해서 쓰레드 개수를 수시로 수동조절할 수 있도록 구성해두었죠.

그리고 지연이 발생하여 SQS 메시지가 발행된 뒤 일정 시간동안 처리를 못하여 큐에 계속 남아있게되면 alert가 발생하도록 되어있습니다.

즉, SQS의 ApproximateAgeOfOldestMessage 지표값을보고 너무 오래동안 처리가 안될 경우 alert를 받고 수동으로 쓰레드 개수를 조절하는 형태로 위기를 벗어나고 있습니다. 그런데 이 드물디 드문 사건이라도 개발자라면, 그리고 가능한 케이스라면 그냥 전부 다 자동화를 해놓아야 하지 않을까 생각이 들어 개선 작업에 들어갔습니다.

 

일단 SQS에서 제공하는 모니터링 지표는 아래와 같습니다. (AWS 콘솔화면의 모니터링 탭에서 볼 수 있는 것들입니다)

  • Approximate Number Of Messages Delayed
  • Approximate Number Of Messages Not Visible
  • Approximate Number Of Messages Visible
  • Approximate Age Of Oldest Message
  • Number Of Empty Receives
  • Number Of Messages Deleted
  • Number Of Messages Received
  • Number Of Messages Sent
  • Sent Message Size

이 지표에 해당하는 통계수치는 cloudwatch 에서 수집이 됩니다.

클라우드워치에서 수집된 데이터를 가지고 모니터링탭에 그래프로 보여주는 것이죠.

 

이제 제가 원하는 Approximate Age Of Oldest Message 에 대한 데이터를 뽑아보도록 하겠습니다.

일단 앱의 구성은 다음과 같습니다.

  • SpringBoot 2.3.x
  • AWS Java SDK 1.11.x
  • Java 11

AWS java sdk 에서는 cloudwatch 서비스에서 제공하는 API를 호출하여 클라우드워치 데이터를 조회할 수 있도록 cloudwatch client를 제공합니다. 제일 먼저 이 클라이언트를 빈으로 등록합니다.

@Configuration
public class AWSConfig {

    private AWSCredentialsProvider awsCredentialsProvider() {
        List<AWSCredentialsProvider> credentialsProviders = new ArrayList<>();
        credentialsProviders.add(new InstanceProfileCredentialsProvider(true));
        credentialsProviders.add(new ProfileCredentialsProvider());
        return new AWSCredentialsProviderChain(credentialsProviders);
    }

    @Bean
    public AmazonCloudWatch cloudWatchClient() {
        return AmazonCloudWatchClientBuilder.standard()
                .withCredentials(awsCredentialsProvider())
                .withRegion(Regions.fromName("ap-northeast-2"))
                .build()
                ;
    }
}

 

그리고 서비스 레이어에서 이 cloudWatchClient를 가져다 써보겠습니다.

    private void getQueueStatus(String queueName) {
        long currentMillis = System.currentTimeMillis();
        long fiveMinutesInMillis = 5 * 60 * 1000;
        GetMetricStatisticsRequest statisticsRequest = new GetMetricStatisticsRequest()
                .withNamespace("AWS/SQS").withMetricName("ApproximateAgeOfOldestMessage")
                .withStatistics(Statistic.Maximum).withPeriod(300)
                .withStartTime(new Date(currentMillis - fiveMinutesInMillis))
                .withEndTime(new Date(currentMillis))
                .withDimensions(new Dimension().withName("QueueName").withValue(queueName));

        GetMetricStatisticsResult result = cloudWatch.getMetricStatistics(statisticsRequest);
        log.debug("dataPoints: {}", result.getDatapoints());
    }

 

cloudWatchClient 를 이용하여 통계수치 데이터를 조회하려면 GetMetricStatisticsRequest 객체를 만들어서 넣어주어야 합니다. 

이 객체에 설정해줘야 하는 값들 중 필수적인 것들만 설정해보았습니다. 

간략히 설명하자면 다음과 같습니다.

  • withNameSpace: cloudwatch에서 서비스를 구분하는 값 (ex. "AWS/SQS", "AWS/EC2", etc.)
  • withMetricName: 조회하고자하는 메트릭 명
  • withStatistics: Statistic 에서 제공하는 통계기준(?), enum으로 정의되어있음
    • SampleCount
    • Average
    • Sum
    • Minimum
    • Maximum
  • withStartTime, withEndTime: 조회하려는 데이터 구간 (데이터의 시작 시점과 종료 시점)
  • withPeriod: 조회하려는 데이터 구간 내에서의 데이터 간격. 예를들면 지난 1시간 동안 몇 분 간격으로 데이터를 조회할지를 의미. 초단위값
  • withDimensions: SQS의 경우 "QueueName" 하나만 있고, 이 값으로 어떤 sqs에 대한 데이터인지 구분 가능.

(참고: Available CloudWatch metrics for Amazon SQS)

 

설정값은 현재 지난 5분동안(from endTime to startTime) 5분간격(period)의 데이터를 조회하도록 되어있으므로 1개의 data point 가 조회가 됩니다. 그리고 unit은 초단위로 나옵니다.

위 코드를 실행해서 조회한 sqs의 Approximate Age Of Oldest Message 지표값은 다음과 같이 출력됩니다.

dataPoints: [{Timestamp: Thu Dec 29 16:08:00 KST 2022,Maximum: 249.0,Unit: Seconds,}]

 

설정값을 변경하여 period를 60으로 넣어서 실행하면 5개가 조회됩니다.

dataPoints: [{Timestamp: Thu Dec 29 17:03:00 KST 2022,Maximum: 3254.0,Unit: Seconds,}, {Timestamp: Thu Dec 29 17:07:00 KST 2022,Maximum: 3554.0,Unit: Seconds,}, {Timestamp: Thu Dec 29 17:05:00 KST 2022,Maximum: 3433.0,Unit: Seconds,}, {Timestamp: Thu Dec 29 17:06:00 KST 2022,Maximum: 3491.0,Unit: Seconds,}, {Timestamp: Thu Dec 29 17:04:00 KST 2022,Maximum: 3370.0,Unit: Seconds,}]

 

조회된 5개의 데이터는 지난 5분 구간(startTime, endTime)에서 1분 간격(period) 데이터를 조회했을 때의 결과입니다. 그리고 이 값은 Approximate Age Of Oldest Message, 즉, 대략적으로 얼마나 오래되었는가를 나타내는 값이므로 1분 간격 데이터를 조회한다면 약 1분(60초)의 시간차이가 있겠죠. 출력된 데이터의 순서가 시간순이 아니니 시간순으로 정렬해보면 약 1분 정도 차이가 난다는 것을 확인할 수 있습니다. 

[
    {Timestamp: Thu Dec 29 17:03:00 KST 2022,Maximum: 3254.0,Unit: Seconds,}, 
    {Timestamp: Thu Dec 29 17:04:00 KST 2022,Maximum: 3370.0,Unit: Seconds,}
    {Timestamp: Thu Dec 29 17:05:00 KST 2022,Maximum: 3433.0,Unit: Seconds,}, 
    {Timestamp: Thu Dec 29 17:06:00 KST 2022,Maximum: 3491.0,Unit: Seconds,}, 
    {Timestamp: Thu Dec 29 17:07:00 KST 2022,Maximum: 3554.0,Unit: Seconds,}, 
]

 

17:03 에서 17:04는 예외적으로 약 2분 차이가 나네요 ^^;;

이상으로 AWS cloudwatch API로 SQS의 metric을 조회하는 방법에 대해 알아보았습니다.

 

저는 이렇게 조회한 데이터를 가지고 일정 시간을 넘어설 경우 쓰레드 개수를 scale in/out 하도록 서비스를 구현했습니다.

 

도움이 되셨다면 공감꾹 부탁드려요~

 

💻 Programming

[AWS] SQS와 Lambda를 이용하여 DLQ 처리하기

안녕하세요, 오랜만에 글을 씁니다.

 

오늘 공유드리는 내용은 분산시스템에서 DLQ 처리하는 방법에 대한 것입니다.

제가 관리하는 시스템 중 하나가 주로 카프카 메시지를 컨숨하여 처리하는 일을 하는 시스템입니다.

여러 개의 인스턴스가 존재하고 모두 동일한 소스코드가 배포되어있죠.

소스코드에는 JPA를 이용한 CRUD 로직이 들어있습니다.

 

문제가 없던 시스템인데 관련부서에서 파티션 개수를 늘리면서 문제가 생겼습니다.

단순히 파티션 개수가 늘어난게 문제가 아니라 그쪽에서 보내는 카프카 메시지의 특성과도 관련이 있었죠.

파티션마다 서로 다른 인스턴스에서 처리하지만 각 메시지에는 서로 중복된 데이터의 내용이 들어가 있는 형태였고 (이건 뭐 제가 어찌 할 수 없는 것이죠), 동일한 데이터를 서로 다른 인스턴스에서 JPA를 이용해서 처리하다가 드물게 ObjectOptimisticLockingFailureException 이 발생하기 시작했습니다. 

예를들어 인스턴스 1번에서 {A, B} 라는 메시지를 처리할 때 인스턴스 2번에서 {A,C}, 3번 인스턴스에서 {A,D} 메시지를 처리하면서 A 데이터를 3대의 인스턴스에서 동시에 처리하려고 한거죠. 세 개의 메시지는 서로 다른 데이터도 들고있으므로 무조건 처리를 해줘야 하는 것들이었습니다.

이걸 어떻게 할 까 고민하다가 SQS를 DLQ를 이용하여 Lambda 와 연결시켜 일정 시간이 지난 뒤에 수동동기화 API를 호출하도록 구성하기로 했습니다.

이렇게 한 이유는 가장 빠른시간에 추가적인 소스구현 없이 효과적으로 목적을 달성할 수 있었기 때문입니다.

만약 DLQ로 들어오는 메시지가 많아진다면 소스코드를 구현하는게 비용측면에선 더 효율적입니다.

인스턴스는 어차피 떠있어야 하는거고 람다를 제거할 수 있으니 람다에서 발생하는 비용이 줄어듭니다.

하지만 소스코드 구현시 고려해야할 점이 있죠. 인스턴스가 여러개 떠있지만 하나의 인스턴스에서만 DLQ의 메시지를 처리하도록 해야한다는 거죠. 그렇지 않으면 또 처리에 실패하는 경우가 발생할 테니까요. 그리고 모든 인스턴스에 동일한 소스코드가 배포되어야 하는데 하나의 인스턴스에서만 실행이 되도록 하려면 또 이런저런 코드를 작성해야 하다보니 람다를 이용하기로 했습니다.

 

아무튼, SQS를 DLQ로 이용하고, Lambda 함수를 연결시켜 일정 시간뒤에 수동API를 호출하여 실패했던 메시지에 대한 정보를 읽어와서 처리하도록 했습니다.

SQS Lambda Trigger 설정

이렇게 설정해놓으면 SQS 메시지가 발행될 때마다 Lambda 함수를 실행하는데 Lambda 함수에서는 API를 호출하도록 구성했습니다. 람다 트리거에 대한 내용을 AWS에서는 다음과 같이 설명하고 있습니다.

You can configure your Amazon SQS queue to trigger an AWS Lambda function when new messages arrive in the queue.
Your queue and Lambda function must be in the same AWS Region.
A Lambda function can use at most one queue as an event source. You can use the same queue with multiple Lambda functions.
You can't associate an encrypted queue that uses the AWS-managed CMK for Amazon SQS with a Lambda function in a different AWS account.

저의 경우에는 필요없긴하지만, 하나의 SQS에 여러개의 람다함수를 연결하여 사용할 수 있다는 점이 좋아보이네요.

 

자, 그리고 SQS와 연결한 Lambda 함수는 다음과 같습니다.

import logging
import json
from urllib.request import Request, urlopen
from urllib.error import URLError, HTTPError

logger = logging.getLogger()
logger.setLevel(logging.INFO)

API_URL = "https://company.com/employees/{}"

def lambda_handler(event, context):

    message = json.loads(event['Records'][0]['body'])
    
    url = API_URL.format(message['employeeId'])
    headers = {'Content-type':'application/json'}
    
    req = Request(url, headers=headers, data=b'', method='PUT')
    
    try:
        response = urlopen(req)
        return response.getcode()
    except HTTPError as e:
        logger.error("Request failed: %d %s, sqsBody:%s", e.code, e.reason, message)
        return e.code
    except URLError as e:
        logger.error("Server connection failed: %s, sqsBody:%s", e.reason, message)
    return None

Python은 공부한적 없으나, 자바는 컴파일언어라 코드를 직접 AWS콘솔에서 작성할 수 없어서 python으로 작성했습니다.

 

내용은 별거 없고 event에 SQS 메시지의 내용이 들어오기 때문에 event에서 메시지의 body 내용을 추출하여 message 변수에 넣고 이것을 API 호출할 때 사용한것이 다입니다. 여기서는 employeeId 값이 SQS 메시지에 {"employeeId":1234} 형태로 들어있었다면, 저기서 1234 값을 추출하여 api 호출할때 path variable로 넣어서 https://company.com/employees/1234 를 호출하도록 한거죠. 

 

위 코드상에선 에러가 발생하면 그냥 로깅하거나 에러코드 리턴하도록 해놓았는데, 에러건이 많이 발생한다면 슬랙알람을 보내도록 하는 것도 좋을 것 같습니다.

 

 

이상으로 SQS와 Lambda를 이용한 분산처리시스템에서의 DLQ 처리에 대한 기록을 마칩니다.