Amazon SQS(Express 워크플로)에서 대용량 메시지 처리 - AWS Step Functions

문서의 영문과 번역 사이에 충돌이 있는 경우에는 영문 버전을 따릅니다. 번역 버전은 기계 번역을 사용하여 제공합니다.

Amazon SQS(Express 워크플로)에서 대용량 메시지 처리

이 샘플 프로젝트는 AWS Step Functions Express 워크플로를 사용하여 Amazon Simple Queue Service(Amazon SQS)와 같은 대용량 이벤트 소스의 메시지 또는 데이터를 처리하는 방법을 보여줍니다. Express 워크플로는 매우 빠른 속도로 시작할 수 있기 때문에 대용량 이벤트 처리 또는 스트리밍 데이터 워크로드에 적합합니다.

다음은 이벤트 소스에서 상태 머신을 실행하는 데 일반적으로 사용되는 두 가지 방법입니다.

  • 이벤트 소스에서 이벤트를 내보낼 때마다 상태 머신 실행을 시작하도록 Amazon CloudWatch Events 규칙을 구성합니다. 자세한 내용은 이벤트에서 트리거되는 CloudWatch 이벤트 규칙 생성을 참조하십시오.

  • 이벤트 소스를 Lambda 함수에 매핑하고 함수 코드를 작성하여 상태 머신을 실행합니다. 이벤트 소스가 이벤트를 내보낼 때마다 호출되는 AWS Lambda 함수가 상태 머신 실행을 시작합니다. 자세한 내용은 Using AWS Lambda with Amazon SQS를 참조하십시오.

두 번째 방법은 이 샘플 프로젝트에서 Amazon SQS 대기열이 메시지를 보낼 때마다 실행을 시작하는 데 사용됩니다. 유사한 구성을 사용하여 Amazon Simple Storage Service(Amazon S3), Amazon DynamoDB, Amazon Kinesis 등의 다른 이벤트 소스에서 Express 워크플로를 실행할 수 있습니다.

Express 워크플로 및 Step Functions 서비스 통합에 대한 자세한 내용은 다음을 참조하십시오.

상태 머신 생성 및 리소스 프로비저닝

  1. Step Functions 콘솔을 열고 [상태 머신 생성]을 선택합니다.

  2. [샘플 프로젝트 실행]을 선택한 다음 [Amazon SQS에서 대용량 메시지 처리]를 선택합니다.

    [Code]와 [Visual Workflow]에 상태 머신 코드와 시각적 워크플로우가 표시됩니다.

    
          고용량 SQS Express 워크플로
  3. 다음을 선택합니다.

    리소스 배포 페이지가 표시되고, 생성될 리소스가 나열됩니다. 이 샘플 프로젝트의 경우 리소스는 다음과 같습니다.

    • Step Functions 상태 머신

    • Amazon SQS 대기열

    • Lambda 기능

  4. 리소스 배포를 선택합니다.

    참고

    이러한 리소스 및 관련 IAM 권한을 생성하는 데 최대 10분이 걸릴 수 있습니다. 리소스 배포 페이지가 표시되는 동아 스택 ID 링크를 열어 어떤 리소스가 프로비저닝되고 있는지 확인할 수 있습니다.

실행 트리거

  1. Amazon SQS 콘솔을 엽니다.

  2. 샘플 프로젝트에서 생성한 대기열을 선택합니다.

    이름은 Example-SQSQueue-wJalrXUtnFEMI와 유사합니다.

  3. 대기열 작업 목록에서 메시지 전송을 선택합니다.

  4. 복사 버튼을 사용하여 다음 메시지를 복사한 다음 메시지 전송 창에 입력하고 메시지 전송을 선택합니다.

    참고

    이 샘플 메시지에서 input: 줄은 페이지에 맞게 줄바꿈을 사용하여 서식이 지정되었습니다. [복사] 버튼을 사용합니다. 또는 줄바꿈 없이 한 줄로 입력되었는지 확인합니다.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 닫기를 선택합니다.

  6. Step Functions 콘솔을 엽니다.

  7. Amazon CloudWatch Logs 로그 그룹으로 이동한 후 로그를 검사합니다. 로그 그룹의 이름은 example-ExpressLogGroup-wJalrXUtnFEMI와 같이 표시됩니다.

샘플 Lambda 함수 코드

다음은 Lambda 시작 함수로 AWS SDK를 사용하여 상태 머신 실행을 시작하는 방법을 보여주는 Lambda 함수 코드입니다.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

예제 상태 머신 코드

이 샘플 프로젝트의 Express 워크플로는 텍스트 처리를 위한 Lambda 함수 집합으로 구성됩니다.

AWS Step Functions에서 다른 AWS 서비스를 제어할 수 있는 방법에 대한 자세한 내용은 AWS Step Functions와 서비스 통합 단원을 참조하십시오.

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM 예

샘플 프로젝트에 의해 생성된 이 예제 AWS Identity and Access Management(IAM) 정책에는 상태 머신과 관련 리소스를 실행할 최소 권한이 포함되어 있습니다. IAM 정책에 필요한 정책만 포함시키는 것이 좋습니다.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

다음 정책은 CloudWatch Logs에 대한 충분한 권한이 있는지 확인합니다.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

다른 AWS 서비스와 함께 Step Functions 사용 시 IAM 구성 방법에 대한 내용은 통합 서비스용 IAM 정책 단원을 참조하십시오.