Amazon SQS からの大容量メッセージの処理 (Express ワークフロー) - AWS Step Functions

Amazon SQS からの大容量メッセージの処理 (Express ワークフロー)

このサンプルプロジェクトでは、AWS Step Functions Express ワークフローを使用して、Amazon Simple Queue Service (Amazon SQS) などの大容量のイベントソースからのメッセージまたはデータを処理する方法を示します。Express ワークフローは非常に高いレートで開始できるため、大容量のイベント処理やストリーミングデータワークロードに最適です。

イベントソースからステートマシンを実行するために一般的に使用される 2 つの方法は次のとおりです。

  • イベントソースがイベントを発行するたびにステートマシンの実行を開始するよう Amazon CloudWatch Events ルールを設定します。 詳細については「イベントでトリガーする CloudWatch イベントルールの作成」をご参照ください。

  • イベントソースを Lambda 関数にマッピングし、ステートマシンを実行する関数コードを記述します。 AWS Lambda 関数は、イベントソースがイベントを発行するたびに呼び出され、ステートマシンの実行が開始されます。詳細については、「Amazon SQS での AWS Lambda の使用」を参照してください。

このサンプルプロジェクトでは、2 番目の方法を使用して Amazon SQS キューがメッセージを送信するたびに実行を開始します。同様の設定を使用して、Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB、Amazon Kinesis などの他のイベントソースから Express ワークフローの実行をトリガーできます。

Express ワークフローと Step Functions サービス統合の詳細については、以下を参照してください。

ステートマシンを作成してリソースをプロビジョニングする

  1. Step Functions コンソールを開き、[ステートマシンの作成] を選択します。

  2. [サンプルプロジェクトを実行] を選択し、[Amazon SQS からの大容量メッセージを処理] を選択します。

    ステートマシン [Code] と [Visual Workflow] が表示されます。

    
          大容量の SQS ワークフローを高速処理します。
  3. [次へ] を選択します。

    作成されるリソースを示す [Deploy resources (リソースのデプロイ)] ページが表示されます。このサンプルプロジェクトでは、以下のリソースが含まれます。

    • Step Functions ステートマシン

    • Amazon SQS キュー

    • Lambda 関数

  4. [リソースのデプロイ] を選択します。

    注記

    これらのリソースおよび関連する IAM アクセス許可が作成されるまで、最大 10 分かかることがあります。[Deploy resources (リソースのデプロイ)] ページが表示されている場合は、[スタック ID] リンクを開いて、プロビジョンされているリソースを表示することができます。

トリガーの実行

  1. Amazon SQS コンソールを開きます。

  2. サンプルプロジェクトで作成されたキューを選択します。

    名前は Example-SQSQueue-wJalrXUtnFEMI のようになっています。

  3. [キュー操作] リストで、[メッセージの送信] を選択します。

  4. コピーボタンを使用して次のメッセージをコピーし、[メッセージの送信] ウィンドウに入力して、[メッセージの送信] を選択します。

    注記

    このサンプルメッセージでは、input: 行がページに合わせて改行されています。コピーボタンを使用するか、改行なしの 1 行として入力されていることを確認します。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. [Close] を選択します。

  6. Step Functions コンソールを開きます。

  7. Amazon CloudWatch Logs ロググループに移動し、ログを調べます。ロググループの名前は、example-ExpressLogGroup-wJalrXUtnFEMI のように表示されます。

Lambda 関数コードの例

以下は、開始する Lambda 関数が AWS SDK を使用してステートマシンの実行を開始する方法を示す Lambda 関数コードです。

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

ステートマシンのコード例

このサンプルプロジェクトの Express ワークフローは、テキスト処理用の Lambda 関数のセットで構成されています。

AWS Step Functions で他の AWS のサービスを制御する方法の詳細については、「サービスの AWS Step Functions との統合 」を参照してください。

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM 例

サンプルプロジェクトによって生成されたこのサンプルの AWS Identity and Access Management (IAM) ポリシーには、ステートマシンおよび関連リソースを実行するために必要な最小限の権限が付与されています。IAM ポリシーに必要なアクセス許可のみを含めることをお勧めします。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

次のポリシーでは、CloudWatch Logs に十分なアクセス許可があることを確認します。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

他の AWS サービスで Step Functions を使用して IAM を設定する方法については、「統合サービスの IAM ポリシー」を参照してください。