Amazon SQS에서 대용량 메시지 처리(Express 워크플로) - AWS Step Functions

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon SQS에서 대용량 메시지 처리(Express 워크플로)

이 샘플 프로젝트는 AWS Step Functions 익스프레스 워크플로를 사용하여 Amazon Simple Queue Service (Amazon SQS) 와 같은 대용량 이벤트 소스의 메시지 또는 데이터를 처리하는 방법을 보여줍니다. Express 워크플로는 매우 빠른 속도로 시작할 수 있기 때문에 대용량 이벤트 처리 또는 스트리밍 데이터 워크로드에 적합합니다.

다음은 이벤트 소스에서 상태 머신을 실행하는 데 일반적으로 사용되는 두 가지 방법입니다.

  • 이벤트 소스에서 CloudWatch 이벤트가 발생할 때마다 상태 머신 실행을 시작하도록 Amazon Events 규칙을 구성합니다. 자세한 내용은 이벤트를 트리거하는 CloudWatch 이벤트 규칙 생성을 참조하십시오.

  • 이벤트 소스를 Lambda 함수에 매핑하고 함수 코드를 작성하여 상태 머신을 실행합니다. 이벤트 소스가 이벤트를 내보낼 때마다 이 AWS Lambda 함수가 호출되어 스테이트 머신 실행이 시작됩니다. 자세한 내용은 Amazon SQS에서 AWS Lambda 사용을 참조하세요.

이 샘플 프로젝트는 두 번째 방법을 사용하여 Amazon SQS 대기열에서 메시지를 보낼 때마다 실행을 시작합니다. 유사한 구성을 사용하여 Amazon Simple Storage Service(S3), Amazon DynamoDB 및 Amazon Kinesis와 같은 다른 이벤트 소스에서 Express 워크플로 실행을 트리거할 수 있습니다.

Express 워크플로 및 Step Functions 서비스 통합에 대한 자세한 내용은 다음을 참조하세요.

1단계: 상태 시스템 만들기 및 리소스 프로비저닝

  1. Step Functions 콘솔을 열고 상태 시스템 생성을 선택합니다.

  2. 검색 상자에 Process high-volume messages from SQS를 입력한 다음 반환된 검색 결과에서 SQS에서 대용량 메시지 처리를 선택합니다.

  3. 다음을 선택하여 계속 진행합니다.

  4. Step Functions는 선택한 샘플 프로젝트에 AWS 서비스 사용된 항목을 나열합니다. 또한 샘플 프로젝트의 워크플로 그래프도 보여줍니다. 이 프로젝트를 사용자 프로젝트에 AWS 계정 배포하거나 자체 프로젝트를 빌드하기 위한 출발점으로 사용하세요. 진행하려는 방식에 따라 데모 실행 또는 이를 기반으로 구축을 선택합니다.

    이 샘플 프로젝트는 다음 리소스를 배포합니다.

    • Lambda 함수 4개

    • Amazon SQS 대기열

    • AWS Step Functions 스테이트 머신

    • 관련 AWS Identity and Access Management (IAM) 역할

    다음 이미지에서는 SQS에서 대용량 메시지 처리 샘플 프로젝트의 워크플로 그래프를 보여줍니다.

    SQS에서 대용량 메시지 처리 샘플 프로젝트의 워크플로 그래프입니다.
  5. 템플릿 사용을 선택하여 계속 선택합니다.

  6. 다음 중 하나를 수행합니다.

    • 이를 기반으로 구축을 선택한 경우 Step Functions는 선택한 샘플 프로젝트의 워크플로 프로토타입을 만듭니다. Step Functions는 워크플로 정의에 나열된 리소스를 배포하지 않습니다.

      Workflow Studio의 디자인 모드에서 상태 브라우저의 상태를 끌어서 놓아 워크플로 프로토타입을 계속 빌드합니다. 또는 VS Code와 유사한 통합 코드 편집기를 제공하는 코드 모드로 전환하여 Step Functions 콘솔 내에서 상태 시스템의 Amazon States Language(ASL) 정의를 업데이트합니다. Workflow Studio를 사용하여 상태 시스템 빌드에 대한 자세한 내용은 Workflow Studio 사용을 참조하세요.

      중요

      워크플로를 실행하기 전에 샘플 프로젝트에 사용된 리소스의 자리 표시자 Amazon 리소스 이름(ARN)을 업데이트해야 합니다.

    • 데모 실행을 선택한 경우 Step Functions는 템플릿을 사용하여 해당 AWS CloudFormation 템플릿에 나열된 AWS 리소스를 사용자에게 배포하는 읽기 전용 샘플 프로젝트를 만듭니다. AWS 계정

      작은 정보

      샘플 프로젝트의 상태 시스템 정의를 보려면 코드를 선택하세요.

      준비가 되면 배포 및 실행을 선택하여 샘플 프로젝트를 배포하고 리소스를 만듭니다.

      이러한 리소스 및 관련 IAM 권한을 만드는 데 최대 10분이 걸릴 수 있습니다. 리소스를 배포하는 동안 CloudFormation Stack ID 링크를 열어 프로비저닝되는 리소스를 확인할 수 있습니다.

      샘플 프로젝트의 모든 리소스가 생성된 후에 상태 시스템 페이지에 새 샘플 프로젝트가 나열됩니다.

      중요

      템플릿에서 사용되는 각 서비스에는 표준 요금이 적용될 수 있습니다. CloudFormation

2단계: 상태 시스템 실행 트리거

  1. Amazon SQS 콘솔을 엽니다.

  2. 샘플 프로젝트에서 생성한 대기열을 선택합니다.

    이름은 Example-SQSQueue-wJalrXUtnFEMI와 유사합니다.

  3. 대기열 작업 목록에서 메시지 전송을 선택합니다.

  4. 복사 버튼을 사용하여 다음 메시지를 복사한 다음 메시지 전송 창에 입력하고 메시지 전송을 선택합니다.

    참고

    이 샘플 메시지에서 input: 줄은 페이지에 맞게 줄바꿈을 사용하여 서식이 지정되었습니다. [복사] 버튼을 사용합니다. 또는 줄바꿈 없이 한 줄로 입력되었는지 확인합니다.

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 닫기를 선택하세요.

  6. Step Functions 콘솔을 엽니다.

  7. Amazon CloudWatch Logs 로그 그룹으로 이동하여 로그를 검사하십시오. 로그 그룹 이름은 예시- ExpressLogGroup -WJALRXUTNFEMI와 같이 표시됩니다.

Lambda 함수 코드 예제

다음은 시작 Lambda 함수가 SDK를 사용하여 상태 머신 실행을 시작하는 방법을 보여주는 Lambda 함수 코드입니다. AWS

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

예제 상태 머신 코드

이 샘플 프로젝트의 Express 워크플로는 텍스트 처리를 위한 Lambda 함수 집합으로 구성됩니다.

다른 서비스를 제어할 AWS Step Functions 수 있는 방법에 대한 자세한 내용은 을 참조하십시오. AWS 다른 서비스와 AWS Step Functions 함께 사용

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM 예제

샘플 프로젝트에서 생성된 이 예제 AWS Identity and Access Management (IAM) 정책에는 스테이트 머신 및 관련 리소스를 실행하는 데 필요한 최소 권한이 포함되어 있습니다. IAM 정책에 필요한 정책만 포함시키는 것이 좋습니다.

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

다음 정책은 로그에 대한 충분한 권한이 있는지 확인합니다. CloudWatch

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

Step Functions를 다른 AWS 서비스와 함께 사용할 때 IAM을 구성하는 방법에 대한 자세한 내용은 을 참조하십시오통합 서비스용 IAM 정책.