Amazon SQS からの大容量メッセージの処理 (Express ワークフロー) - AWS Step Functions

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon SQS からの大容量メッセージの処理 (Express ワークフロー)

このサンプルプロジェクトでは、 AWS Step Functions Express ワークフローを使用して、Amazon Simple Queue Service (Amazon SQS) などの大量のイベントソースからのメッセージまたはデータを処理する方法を示します。Express ワークフローは非常に高いレートで開始できるため、大容量のイベント処理やストリーミングデータワークロードに最適です。

イベントソースからステートマシンを実行するために一般的に使用される 2 つの方法は次のとおりです。

  • イベントソースが CloudWatch イベントを発行するたびにステートマシンの実行を開始するように Amazon Events ルールを設定します。詳細については、 CloudWatch 「イベント でトリガーするイベントルールの作成」を参照してください。

  • イベントソースを Lambda 関数にマッピングし、ステートマシンを実行する関数コードを記述します。 AWS Lambda 関数は、イベントソースがイベントを発行するたびに呼び出され、ステートマシンの実行が開始されます。詳細については、Amazon SQS を使った AWS Lambda の使用を参照してください。

このサンプルプロジェクトでは、2 番目の方法を使用して Amazon SQS キューがメッセージを送信するたびに実行をスタートします。同様の設定を使用して、Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB、Amazon Kinesis などの他のイベントソースから Express ワークフローの実行をトリガーできます。

Express ワークフローと Step Functions サービス統合の詳細については、以下を参照してください。

ステップ 1: ステートマシンを作成してリソースをプロビジョニングする

  1. Step Functions コンソールを開き、[ステートマシンの作成] を選択します。

  2. 検索ボックスに Process high-volume messages from SQS と入力し、返された検索結果から [SQS からの大容量メッセージを処理] を選択します。

  3. [次へ] を選択して続行します。

  4. Step Functions は、選択したサンプルプロジェクトで AWS サービス 使用されている を一覧表示します。サンプルプロジェクトのワークフローグラフも表示されます。このプロジェクトを にデプロイ AWS アカウント するか、独自のプロジェクトを構築するための出発点として使用します。進める方法に応じて、[デモの実行] または [その上に構築する] を選択します。

    このサンプルプロジェクトは、以下のリソースをデプロイします。

    • 4 つの Lambda 関数

    • Amazon SQSキュー

    • AWS Step Functions ステートマシン

    • 関連する AWS Identity and Access Management (IAM) ロール

    以下のイメージは、[SQS からの大容量メッセージを処理] サンプルプロジェクトのワークフローグラフを示しています。

    [SQS からの大容量メッセージを処理] サンプルプロジェクトのワークフローグラフ。
  5. [テンプレートの使用] を選択して選択を続行します。

  6. 次のいずれかを行います。

    • [その上に構築する] を選択した場合、Step Functions は選択したサンプルプロジェクトのワークフロープロトタイプを作成します。Step Functions は、ワークフロー定義にリストされているリソースをデプロイしません。

      Workflow Studio の デザインモード では、[State browser] (状態ブラウザ) から状態をドラッグアンドドロップして、ワークフロープロトタイプの構築を続行できます。または、VS Code と同様の統合コードエディタを提供する コードモード に切り替えて、Step Functions コンソール内のステートマシンの Amazon ステートメント言語 (ASL) 定義を更新してください。Workflow Studio を使用してステートマシンを構築する方法の詳細については、「Workflow Studio を使用する」を参照してください。

      重要

      ワークフローを実行する前に、サンプルプロジェクトで使用されているリソースのプレースホルダー Amazon リソースネーム (ARN) を必ず更新してください。

    • デモの実行を選択した場合、Step Functions は、 AWS CloudFormation テンプレートを使用して、そのテンプレートにリストされている AWS リソースを にデプロイする読み取り専用サンプルプロジェクトを作成します AWS アカウント。

      ヒント

      サンプルプロジェクトのステートマシン定義を表示するには、[コード] を選択します。

      準備できたら、[デプロイと実行] を選択してサンプルプロジェクトをデプロイし、リソースを作成します。

      これらのリソースおよび関連する IAM 許可が作成されるまで、最大 10 分かかることがあります。リソースのデプロイ中に、 CloudFormation スタック ID リンクを開いて、プロビジョニングされているリソースを確認できます。

      サンプルプロジェクトのすべてのリソースが作成されると、新しいサンプルプロジェクトが [ステートマシン] ページに表示されます。

      重要

      CloudFormation テンプレートで使用されるサービスごとに、標準料金が適用される場合があります。

ステップ 2: ステートマシンの実行をトリガーする

  1. [Amazon SQS console] (Amazon SQS コンソール) を開きます。

  2. サンプルプロジェクトで作成されたキューを選択します。

    名前は Example-SQSQueue-wJalrXUtnFEMI のようになっています。

  3. [Queue Actions] (キュー操作) リストで、[Send a Message] (メッセージの送信) を選択します。

  4. コピーボタンを使用して次のメッセージをコピーし、[Send a Message] (メッセージの送信) ウィンドウに入力して、[Send Message] (メッセージの送信) を選択します。

    注記

    このサンプルメッセージでは、input: 行がページに合わせて改行されています。コピーボタンを使用するか、改行なしの 1 行として入力されていることを確認します。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. [Close] (閉じる) を選択します。

  6. Step Functions コンソールを開きます。

  7. Amazon CloudWatch Logs ロググループに移動し、ログを調べます。ロググループの名前は example-ExpressLogGroup-wJalrXUtnFEMI のようになります。

Lambda 関数のコードの例

以下は、開始する Lambda 関数が AWS SDK を使用してステートマシンの実行を開始する方法を示す Lambda 関数コードです。

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

ステートマシンのコード例

このサンプルプロジェクトの Express ワークフローは、テキスト処理用の Lambda 関数のセットで構成されています。

AWS Step Functions が他の AWS のサービスをコントロールする方法の詳細については、「」を参照してくださいAWS Step Functions 他のサービスとの併用

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM の例

サンプルプロジェクトによって生成されたこの AWS Identity and Access Management (IAM) ポリシーの例には、ステートマシンと関連リソースを実行するために必要な最小権限が含まれています。IAM ポリシーで必要なアクセス許可のみを含めることをお勧めします。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

次のポリシーは、 CloudWatch ログに対する十分なアクセス許可があることを確認します。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

他の AWS サービスで Step Functions を使用するときに IAM を設定する方法については、「」を参照してください統合サービスの IAM ポリシー