Amazon SQS에서 Lambda 사용 - AWS Lambda

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon SQS에서 Lambda 사용

Lambda 함수를 사용하여 Amazon Simple Queue Service(Amazon SQS) 대기열의 메시지를 처리할 수 있습니다. Lambda 이벤트 소스 매핑표준 대기열선입선출(FIFO) 대기열을 지원합니다. Amazon SQS를 사용하면 작업을 대기열로 전송하고 비동기식으로 처리하여 애플리케이션의 구성 요소 중 하나에서 작업을 오프로드할 수 있습니다.

Lambda는 대기열을 폴링하고 대기열 메시지를 포함한 이벤트와 동기적으로 Lambda 함수를 호출합니다. Lambda는 메시지를 배치 단위로 읽고 각 배치에 대해 한 번씩 함수를 호출합니다. 함수가 배치를 성공적으로 처리하면 Lambda는 대기열에서 메시지를 삭제합니다.

Lambda가 배치를 읽으면 메시지는 대기열에 머무르지만 대기열의 가시성 시간제한 동안 숨겨집니다. 함수가 배치를 성공적으로 처리하면 Lambda는 대기열에서 메시지를 삭제합니다. 기본적으로 함수에서 배치를 처리하는 동안 오류가 발생하는 경우 가시성 제한 시간이 만료된 후 해당 배치의 모든 메시지가 대기열에 다시 표시됩니다. 이러한 이유로 함수 코드는 의도하지 않은 부작용 없이 동일한 메시지를 여러 번 처리할 수 있어야 합니다.

Lambda가 메시지를 여러 번 처리하지 못하도록 하려면 함수 응답에 일괄 항목 실패를 포함하도록 이벤트 소스 매핑을 구성하거나 Lambda 함수가 메시지를 성공적으로 처리할 때 Amazon SQS API DeleteMessage작업을 사용하여 대기열에서 메시지를 제거할 수 있습니다. Amazon SQS API 사용에 대한 자세한 내용은 Amazon 심플 큐 서비스 API 레퍼런스를 참조하십시오.

표준 대기열 메시지 이벤트의 예

예 Amazon SQS 메시지 이벤트(표준 대기열)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

기본적으로 Lambda는 대기열에서 최대 10개의 메시지를 한 번에 폴링하고 해당 배치를 함수로 보냅니다. 소수의 레코드로 함수를 호출하는 것을 피하려면 배치 기간을 구성하여 이벤트 소스가 최대 5분 동안 레코드를 버퍼링하도록 지정할 수 있습니다. 함수를 호출하기 전에 Lambda는 배치 기간이 만료되거나, 호출 페이로드 크기 할당량에 도달하거나, 구성된 최대 배치 크기에 도달할 때까지 SQS 표준 대기열에서 메시지를 계속 폴링합니다.

배치 기간을 사용하고 SQS 대기열에 트래픽이 매우 적은 경우, Lambda는 함수를 호출하기 전에 최대 20초까지 기다릴 수 있습니다. 이는 배치 기간을 20초 미만으로 설정한 경우에도 마찬가지입니다.

참고

Java에서는 JSON을 역직렬화할 때 null 포인터 오류가 발생할 수 있습니다. 이는 JSON 객체 매퍼가 ‘Records’ 및 “eventSourceARN’의 대소문자를 변환하는 방식 때문일 수 있습니다.

FIFO 대기열 메시지 이벤트의 예

FIFO 대기열의 경우 레코드에는 중복 제거 및 시퀀싱과 관련된 추가 속성이 포함되어 있습니다.

예 Amazon SQS 메시지 이벤트(FIFO 대기열)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

Lambda에서 사용할 수 있도록 대기열 구성

Lambda 함수의 이벤트 소스로 제공할 SQS 대기열을 만듭니다. 그런 다음 Lambda 함수가 각 이벤트 배치를 처리하고 Lambda가 확장 시 제한 오류에 응답해 다시 시도할 수 있는 시간을 허용하도록 대기열을 구성합니다.

함수 시간이 각 레코드 배치를 처리할 수 있도록 하려면 소스 대기열의 가시성 제한 시간을 함수에 구성한 제한 시간의 6배 이상으로 설정하세요. 이전 배치를 처리하는 동안 함수가 제한되는 경우 추가 시간을 통해 Lambda가 재시도할 수 있습니다.

함수가 메시지를 여러 번 처리하지 못하면 Amazon SQS에서 메시지를 배달 못한 편지 대기열로 보낼 수 있습니다. 함수에서 오류를 반환하면 배치 내 모든 항목이 대기열로 반환됩니다. 가시성 시간 초과가 발생하면 Lambda는 메시지를 다시 수신합니다. 여러 번 수신한 후 메시지를 두 번째 대기열로 보내려면 소스 대기열에 배달 못한 편지 대기열을 구성합니다.

참고

Lambda 함수가 아닌 소스 대기열에서 배달 못한 편지 대기열을 구성해야 합니다. 함수에서 구성하는 배달 못한 편지 대기열은 이벤트 소스 대기열이 아닌 함수의 비동기식 호출 대기열에 사용됩니다.

함수가 오류를 반환하거나 최대 동시성 상태이기 때문에 호출할 수 없는 경우에는 추가 시도로 처리에 성공할 수 있습니다. 메시지를 배달 못한 편지 대기열로 보내기 전에 메시지를 더 잘 처리할 수 있도록 하려면 원본 대기열의 리드라이브 정책에서 maxReceiveCount5이상으로 설정합니다.

실행 역할 권한

AWSLambdaSQSQueueExecutionRoleAWS관리형 정책에는 Lambda가 Amazon SQS 대기열에서 읽는 데 필요한 권한이 포함됩니다. 이 관리형 정책을 함수의 실행 역할에 추가하십시오.

선택적으로 암호화된 대기열을 사용하는 경우 실행 역할에 다음 권한도 추가해야 합니다.

권한을 추가하고 이벤트 소스 매핑을 생성하세요.

이벤트 소스 매핑을 생성하여 Lambda가 대기열의 항목을 Lambda 함수로 전송하게 할 수 있습니다. 여러 이벤트 소스 매핑을 생성하여 하나의 함수로 여러 대기열의 항목을 처리할 수 있습니다. Lambda가 대상 함수를 호출하면 이벤트에는 구성 가능한 최대 배치 크기까지 항목이 여러 개 포함될 수 있습니다.

Amazon SQS에서 읽도록 함수를 구성하려면 AWSLambdaSQSQueueExecutionRoleAWS관리형 정책을 실행 역할에 연결한 다음 SQS 트리거를 생성하십시오.

권한을 추가하고 트리거를 생성하려면
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수의 이름을 선택합니다.

  3. 구성(Configuration) 탭을 선택한 다음, 권한(Permissions)을 선택합니다.

  4. 역할 이름에서 실행 역할 링크를 선택합니다. 이 링크는 IAM 콘솔에서 역할을 엽니다.

    
              실행 역할 링크
  5. 권한 추가를 선택하고 정책 연결을 선택합니다.

    
              IAM 콘솔에 정책 연결
  6. 검색 필드에 AWSLambdaSQSQueueExecutionRole를 입력합니다. 이 정책을 실행 역할에 추가하세요. 이는 함수가 Amazon SQS 대기열에서 읽는 데 필요한 권한을 포함하는 AWS 관리형 정책입니다. 이 정책에 대한 자세한 내용은 AWS관리형 정책 참조를 참조하십시오 AWSLambdaSQSQueueExecutionRole.

  7. Lambda 콘솔에서 함수로 돌아가십시오. 함수 개요(Function overview)에서 트리거 추가(Add trigger)를 선택합니다.

    
              Lambda 콘솔의 함수 개요 섹션
  8. 트리거 유형을 선택합니다.

  9. 필요한 옵션을 구성한 다음 추가를 선택합니다.

Lambda는 Amazon SQS 이벤트 소스에 대해 다음과 같은 옵션을 지원합니다.

SQS 대기열

레코드를 읽어 올 Amazon SQS 대기열입니다.

트리거 활성화

이벤트 소스 매핑의 상태입니다. Enable trigger(트리거 활성화)는 기본적으로 선택됩니다.

배치 크기

각 배치에서 함수에 보낼 레코드 최대 수입니다. 표준 대기열의 경우 최대 10,000개의 레코드가 될 수 있습니다. FIFO 대기열의 경우 최대값은 10입니다. 10을 초과하는 배치 크기의 경우 배치 기간(MaximumBatchingWindowInSeconds)도 최소 1초로 설정해야 합니다.

함수 제한 시간은 항목의 전체 배치를 처리할 시간이 충분하도록 구성합니다. 항목을 처리하는 데 걸리는 시간이 길면 더 작은 배치 크기를 선택합니다. 배치 크기가 크면 매우 빠르거나 오버헤드가 큰 워크로드에 대한 효율성에 영향을 미칠 수 있습니다. 함수에 예약된 동시성을 구성할 경우 최소 동시성 실행 수를 5로 설정하여 Lambda가 함수를 호출할 때 오류를 제한할 수 있는 가능성을 줄이세요.

이벤트의 총 크기가 동기식 호출에 대한 호출 페이로드 크기 할당량(6MB)을 초과하지 않는 한 Lambda는 단일 호출로 배치의 모든 레코드를 함수에 전달합니다. Lambda와 Amazon SQS는 모두 각 레코드의 메타데이터를 생성합니다. 이 추가 메타데이터는 총 페이로드 크기에 포함되며, 그러면 배치로 전송된 총 레코드 수가 구성된 배치 크기보다 작을 수 있습니다. Amazon SQS에서 전송하는 메타데이터 필드는 길이가 가변적일 수 있습니다. Amazon SQS 메타데이터 필드에 대한 자세한 내용은 Amazon 단순 대기열 서비스 ReceiveMessageAPI 참조의 API 작업 설명서를 참조하십시오.

배치 기간

함수를 호출하기 전에 기록을 수집할 최대 기간(단위: 초)입니다. 이 지표는 표준 대기열에만 적용됩니다.

0초보다 큰 배치 기간을 사용하는 경우 대기열의 가시성 제한 시간에서 늘어난 처리 시간을 고려해야 합니다. 대기열의 가시성 제한 시간을 함수 제한 시간의 6배에 MaximumBatchingWindowInSeconds 값을 더한 값으로 설정하는 것이 좋습니다. 이렇게 하면 Lambda 함수가 각 이벤트 배치를 처리하고 제한 오류가 발생할 경우 다시 시도할 수 있습니다.

메시지를 사용할 수 있게 되면 Lambda는 메시지를 일괄 처리하기 시작합니다. Lambda는 함수를 동시에 5번 호출하여 한 번에 5개의 배치를 처리하기 시작합니다. 메시지를 계속 사용할 수 있는 경우 Lambda는 분당 최대 300개의 함수 인스턴스를 추가해 최대 1,000개까지 추가합니다. 함수 확장 및 동시성에 대한 자세한 내용은 Lambda 함수 크기 조정을 참조하세요.

더 많은 메시지를 처리하기 위해 Lambda 함수를 최적화하여 처리량을 높일 수 있습니다. AWS Lambda의 Amazon SQS 표준 대기열 크기 조정 방식 이해를 참조하세요.

최대 동시성

이벤트 소스가 호출할 수 있는 최대 동시성 함수 수입니다. 자세한 설명은 Amazon SQS 이벤트 소스의 최대 동시성 구성 섹션을 참조하세요.

필터 기준

필터 기준을 추가하여 Lambda가 처리를 위해 함수로 보내는 이벤트를 제어합니다. 자세한 설명은 Lambda 이벤트 필터링 섹션을 참조하세요.

조정 및 처리

표준 대개열의 경우 Lambda는 긴 폴링을 사용하여 대기열이 활성화될 때까지 대기열을 폴링합니다. 메시지를 사용할 수 있는 경우 Lambda는 함수를 동시에 5번 호출하여 한 번에 5개의 배치를 처리하기 시작합니다. 메시지를 계속 사용할 수 있는 경우 Lambda는 배치를 읽는 프로세스의 수를 분당 최대 300개의 추가 인스턴스까지 증가시킵니다. 이벤트 소스 매핑으로 동시에 처리할 수 있는 최대 배치 수는 1,000개입니다.

FIFO 대기열의 경우, Lambda는 메시지를 수신하는 순서대로 함수에 메시지를 보냅니다. FIFO 대기열에 메시지를 전송할 때 메시지 그룹 ID를 지정합니다. Amazon SQS는 동일한 그룹의 메시지가 순서대로 Lambda에 전송되도록 합니다. Lambda는 메시지를 그룹으로 분류하고 그룹에 대해 한 번에 한 배치만 보냅니다. 함수가 오류를 반환하면 함수는 Lambda가 동일한 그룹에서 추가 메시지를 수신하기 전에 영향을 받는 메시지에 대해 모든 재시도를 시도합니다.

함수는 활성 메시지 그룹의 수에 동시성을 조정할 수 있습니다. 자세한 내용은 AWS 컴퓨팅 블로그의 이벤트 소스로 SQS FIFO를 참조하세요.

Amazon SQS 이벤트 소스의 최대 동시성 구성

최대 동시성 설정은 Amazon SQS 이벤트 소스가 호출할 수 있는 함수의 동시 인스턴스 수를 제한합니다. 최대 동시성은 이벤트 소스 수준 설정입니다. 여러 Amazon SQS 이벤트 소스가 하나의 함수에 매핑되어 있는 경우, 각 이벤트 소스마다 별도의 최대 동시성 설정이 있을 수 있습니다. 최대 동시성을 사용하여 단일 대기열이 함수의 예약된 동시성 전체를 사용하거나 계정의 나머지 동시성 할당량을 사용하지 못하도록 할 수 있습니다. Amazon SQS 이벤트 소스에 대해 최대 동시성을 구성하는 데는 요금이 부과되지 않습니다.

중요한 것은 최대 동시성과 예약된 동시성은 독립된 두 설정이라는 것입니다. 함수의 예약된 동시성보다 큰 최대 동시성을 설정하지 마세요. 최대 동시성을 구성한 경우, 함수의 예약된 동시성이 함수의 모든 Amazon SQS 이벤트 소스에 대한 총 최대 동시성보다 크거나 같은지 확인합니다. 그렇지 않으면 Lambda가 메시지를 제한할 수도 있습니다.

최대 동시성이 설정되지 않은 경우 Lambda는 Amazon SQS 이벤트 소스를 계정의 총 동시성 할당량(기본값 1,000)까지 확장할 수 있습니다.

참고

FIFO 대기열의 경우 동시 호출은 메시지 그룹 ID(messageGroupId) 또는 최대 동시성 설정 중 더 낮은 값으로 제한됩니다. 예를 들어, 메시지 그룹 ID가 6개이고 최대 동시성이 10으로 설정된 경우 함수는 최대 6개의 동시 호출을 가질 수 있습니다.

신규 및 기존 Amazon SQS 이벤트 소스 매핑에 대해 최대 동시성을 구성할 수 있습니다.

Lambda 콘솔을 사용하여 최대 동시성 구성
  1. Lambda 콘솔의 함수 페이지를 엽니다.

  2. 함수의 이름을 선택합니다.

  3. Function overview(함수 개요)에서 SQS를 선택합니다. 그러면 Configuration(구성) 탭이 열립니다.

  4. Amazon SQS 트리거를 선택하고 Edit(편집)를 선택합니다.

  5. Maximum concurrency(최대 동시성)에 2에서 1,000 사이의 숫자를 입력합니다. 최대 동시성을 해제하려면 상자를 비워 둡니다.

  6. 저장을 선택합니다.

AWS Command Line Interface(AWS CLI)를 사용하여 최대 동시성 구성

--scaling-config 옵션과 함께 update-event-source-mapping 명령을 사용합니다. 예제

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

최대 동시성을 해제하려면 --scaling-config에 빈 값을 입력합니다.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config "{}"
Lambda API를 사용하여 최대 동시성 구성

CreateEventSourceMapping또는 UpdateEventSourceMapping작업을 ScalingConfig객체와 함께 사용하십시오.

이벤트 소스 매핑 API

AWS Command Line Interface(AWS CLI) 또는 AWS SDK를 사용하여 이벤트 소스를 관리하려면 다음 API 작업을 사용할 수 있습니다.

다음 예제는 AWS CLI를 사용하여 배치 크기가 5이고 배치 기간이 60초인 Amazon 리소스 이름(ARN)으로 지정된 Amazon SQS 대기열에 my-function이라는 함수를 매핑합니다.

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

다음 결과가 표시됩니다.

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

실패한 호출에 대한 백오프 전략

호출이 실패하면 Lambda는 백오프 전략을 구현하면서 호출을 재시도합니다. 백오프 전략은 Lambda가 함수 코드의 오류로 인해 장애를 겪었는지 아니면 제한으로 인해 장애가 발생했는지에 따라 약간 다릅니다.

  • 함수 코드로 인해 오류가 발생한 경우 Lambda는 Amazon SQS 이벤트 소스 매핑에 할당되는 동시성의 양을 줄여 재시도를 점진적으로 백오프합니다. 호출이 계속 실패하면 Lambda는 결국 다시 시도하지 않고 메시지를 삭제합니다.

  • 제한으로 인해 호출이 실패하는 경우 Lambda는 Amazon SQS 이벤트 소스 매핑에 할당되는 동시성의 양을 줄여 재시도를 점진적으로 백오프합니다. Lambda는 메시지의 타임스탬프가 대기열의 가시성 제한 시간을 초과할 때(이때 Lambda가 메시지를 삭제)까지 메시지를 계속 재시도합니다.

부분 일괄 응답 구현

배치를 처리하는 동안 Lambda 함수에 오류가 발생하면 Lambda가 성공적으로 처리한 메시지를 포함하여 해당 배치의 모든 메시지가 기본적으로 대기열에 다시 표시됩니다. 따라서 함수가 동일한 메시지를 여러 번 처리할 수 있습니다.

실패한 배치에서 정상 처리된 메시지를 재처리하지 않으려면 실패한 메시지만 다시 표시하도록 이벤트 소스 매핑을 구성할 수 있습니다. 이를 부분 일괄 응답이라고 합니다. 부분 일괄 응답을 켜려면 이벤트 소스 매핑을 구성할 때 FunctionResponseTypes작업에 ReportBatchItemFailures 대해 지정하십시오. 그러면 함수가 부분적인 성공을 반환할 수 있으므로 레코드에 대한 불필요한 재시도 횟수를 줄일 수 있습니다.

ReportBatchItemFailures이 활성화되면 Lambda는 함수 호출이 실패할 때 메시지 폴링을 축소하지 않습니다. 일부 메시지에 오류가 발생할 것으로 예상되고 이러한 오류가 메시지 처리 속도에 영향을 미치지 않도록 하려면 ReportBatchItemFailures를 사용하십시오.

참고

부분 일괄 응답을 사용할 때는 다음 사항에 유의하세요.

  • 함수에서 예외가 발생한 경우 전체 배치가 완전한 실패로 간주됩니다.

  • FIFO 대기열과 함께 이 기능을 사용하는 경우 함수는 첫 번째 실패 후 메시지 처리를 중지하고 batchItemFailures에서 모든 실패한 메시지와 처리되지 않은 메시지를 반환해야 합니다. 그러면 대기열의 메시지 순서를 유지할 수 있습니다.

부분 배치 보고를 활성화 방법
  1. 부분 일괄 응답 구현에 대한 모범 사례를 검토하세요.

  2. 다음 명령을 실행하여 함수의 ReportBatchItemFailures을 활성화합니다. 이벤트 소스 매핑의 UUID를 검색하려면 명령을 실행합니다. list-event-source-mappingsAWS CLI

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. 함수 코드를 업데이트하여 모든 예외를 포착하고 실패한 메시지를 batchItemFailures JSON 응답으로 반환하세요. batchItemFailures 응답에는 메시지 ID 목록이 itemIdentifier JSON 값으로 포함되어야 합니다.

    예를 들어 메시지 ID가 id1, id2, id3, id4, id5인 5개의 메시지로 구성된 배치가 있다고 가정합니다. 함수가 id1, id3, id5를 성공적으로 처리합니다. id2id4 메시지를 대기열에서 다시 볼 수 있도록 하려면 함수가 다음 응답을 리턴해야 합니다.

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    다음은 일괄적으로 실패한 메시지 ID 목록을 반환하는 함수 코드의 몇 가지 예입니다.

    .NET
    AWS SDK for .NET
    참고

    자세한 내용은 에서 확인할 수 있습니다. GitHub 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    .NET을 사용하여 Lambda로 SQS 배치 항목 실패 보고

    using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK for Go V2
    참고

    더 많은 것이 있어요 GitHub. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Go를 사용하여 Lambda에서 SQS 배치 항목 장애를 보고합니다.

    package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    Java 2.x SDK
    참고

    자세한 내용은 여기를 참조하십시오. GitHub 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Java를 사용하여 Lambda로 SQS 배치 항목 실패 보고

    import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    PHP
    SDK for PHP
    참고

    더 많은 것이 있어요 GitHub. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    PHP를 사용하여 Lambda에서 SQS 배치 항목 장애를 보고합니다.

    <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $sqsEvent = new SqsEvent($event); $this->logger->info("Processing SQS records"); $records = $sqsEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getMessageId(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); // Format failures for the response $failures = array_map( fn(string $messageId) => ['itemIdentifier' => $messageId], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger); ?>
    Python
    SDK for Python (Boto3)
    참고

    자세한 내용은 여기를 참조하십시오. GitHub 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Python을 사용하여 Lambda로 SQS 배치 항목 실패 보고

    import json def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK for Ruby
    참고

    더 많은 것이 있어요 GitHub. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Ruby를 사용하여 Lambda로 SQS 배치 항목 실패를 보고합니다.

    require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK for Rust
    참고

    더 많은 것이 있어요 GitHub. 서버리스 예제 리포지토리에서 전체 예제를 찾아보고 설정 및 실행 방법을 알아봅니다.

    Rust를 사용하여 Lambda로 SQS 배치 항목 실패를 보고합니다.

    use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

실패한 이벤트가 대기열로 돌아오지 않는 경우 Lambda 함수 SQS의 문제를 해결하려면 어떻게 해야 합니까? 를 참조하십시오. ReportBatchItemFailures 지식 센터에서. AWS

성공 및 실패 조건

Lambda는 함수가 다음 중 하나를 반환할 경우 배치를 완전한 성공으로 처리합니다.

  • 비어 있는 batchItemFailures 목록

  • null batchItemFailures 목록

  • 비어 있는 EventResponse

  • null EventResponse

Lambda는 함수가 다음 중 하나를 반환할 경우 배치를 완전한 실패로 처리합니다.

  • 잘못된 JSON 응답

  • 빈 문자열 itemIdentifier

  • null itemIdentifier

  • 키 이름이 잘못된 itemIdentifier

  • 존재하지 않는 메시지 ID가 있는 itemIdentifier

CloudWatch 메트릭스

함수가 배치 항목 실패를 제대로 보고하고 있는지 확인하려면 Amazon에서 NumberOfMessagesDeletedApproximateAgeOfOldestMessage Amazon SQS 지표를 모니터링할 수 있습니다. CloudWatch

  • NumberOfMessagesDeleted는 대기열에서 제거된 메시지 수를 추적합니다. 이 값이 0으로 떨어지면 함수 응답이 실패한 메시지를 올바르게 반환하지 않는다는 신호입니다.

  • ApproximateAgeOfOldestMessage는 가장 오래된 메시지가 대기열에 머물렀던 시간을 추적합니다. 이 지표가 급격히 증가하면 함수가 실패한 메시지를 올바르게 반환하지 않음을 나타낼 수 있습니다.

Amazon SQS 구성 파라미터

모든 Lambda 이벤트 소스 유형은 CreateEventSourceMapping동일한 UpdateEventSourceMappingAPI 작업을 공유합니다. 그러나 일부 파라미터만 Amazon SQS에 적용됩니다.

Amazon SQS에 적용되는 이벤트 소스 파라미터
파라미터 필수 기본값 참고

BatchSize

N

10

표준 대기열의 경우 최댓값은 10,000입니다. FIFO 대기열의 경우 최댓값은 10입니다.

활성

N

true

EventSourceArn

Y

데이터 스트림 또는 스트림 소비자의 ARN

FunctionName

Y

FilterCriteria

N

Lambda 이벤트 필터링

FunctionResponseTypes

N

함수가 배치에서 특정 실패를 보고하도록 하려면 FunctionResponseTypesReportBatchItemFailures 값을 포함하세요. 자세한 설명은 부분 일괄 응답 구현 섹션을 참조하세요.

MaximumBatchingWindowInSeconds

N

0

ScalingConfig

N

Amazon SQS 이벤트 소스의 최대 동시성 구성