處理來自 Amazon SQS 的大量訊息 (快速工作流程) - AWS Step Functions

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

處理來自 Amazon SQS 的大量訊息 (快速工作流程)

此範例專案示範如何使用 AWS Step Functions 快速工作流程處理來自大量事件來源的訊息或資料,例如 Amazon Simple Queue Service (Amazon SQS)。由於快速工作流程能以非常高的速率啟動,因此非常適用於大量事件處理或串流資料工作負載。

以下是從事件來源執行狀態機器的兩種常用方法:

  • 設定 Amazon CloudWatch 事件規則,以便在事件來源發出事件時啟動狀態機器執行。如需詳細資訊,請參閱建立在 CloudWatch 事件上觸發的事件規則

  • 將事件來源映射至 Lambda 函數,然後撰寫函數程式碼以執行您的狀態機器。每次事件來源發出事件時,都會叫用 AWS Lambda 函數,進而啟動狀態機器執行。如需詳細資訊,請參閱AWS Lambda 搭配 Amazon SQS 使用

此範例專案會在每次 Amazon SQS 佇列傳送訊息時,使用第二種方法啟動執行。您可以使用類似的組態,從其他事件來源觸發快速工作流程執行,例如 Amazon 簡單儲存服務 (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis。

如需有關 Express 工作流程和 Step Functions 服務整合的詳細資訊,請參閱下列內容:

步驟 1:建立狀態機器並佈建資源

  1. 開啟 Step Functions 主控台,然後選擇建立狀態機器

  2. Process high-volume messages from SQS在搜尋方塊中輸入,然後從傳回的搜尋結果中選擇「處理來自 SQS 的大量訊息」。

  3. 選擇 Next (下一步) 繼續。

  4. Step Functions 會列出您選取的範例專案中 AWS 服務 使用的項目。它也會顯示範例專案的工作流程圖表。將此項目部署到您的項目中, AWS 帳戶 或將其用作構建自己的項目的起點。根據您想要的進行方式,選擇 [執行示範] 或 [在其上建置]。

    此範例專案會部署下列資源:

    • 四 Lambda 数

    • Amazon SQS 隊列

    • 一个 AWS Step Functions 状态机

    • 相關的 AWS Identity and Access Management (IAM) 角色

    下列影像顯示處理來自 SQS 範例專案的大量訊息的工作流程圖形:

    
            從 SQS 範例專案處理大量訊息的工作流程圖。
  5. 選擇「使用範本」繼續進行選取。

  6. 執行以下任意一項:

    • 如果您選取「在其上建置」,「Step Functions」會為您選取的範例專案建立工作流程原型。Step Functions 不會部署工作流程定義中列出的資源。

      在工作流程 Studio 中設計模式,從拖放狀態狀態瀏覽器以繼續建立您的工作流程原型。或者切換到提供類似 VS Code 的整合式程式碼編輯器,以便在 Step Functions 主控台中更新狀態機器的 Amazon States Language (ASL) 定義。程式碼模式如需有關使用工作流程 Studio 建置狀態機器的詳細資訊,請參閱使用工作流程

      重要

      請記得在執行工作流程之前,更新範例專案中使用之資源的預留位置 Amazon 資源名稱 (ARN)。

    • 如果您選取 [執行示範],Step Functions 會建立唯讀範例專案,該專案會使用 AWS CloudFormation 範本將該範本中列出的 AWS 資源部署到您的 AWS 帳戶.

      提示

      若要檢視範例專案的狀態機定義,請選擇 [程式碼]。

      準備就緒後,請選擇 [部署並執行] 以部署範例專案並建立資源。

      建立這些資源和相關 IAM 許可最多可能需要 10 分鐘的時間。部署資源時,您可以開啟 CloudFormation Stack ID 連結以查看正在佈建的資源。

      建立範例專案中的所有資源之後,您可以在 [狀態機器] 頁面上看到列出的新範例專案。

      重要

      CloudFormation 範本中使用的每項服務可能會收取標準費用。

步驟 2:觸發狀態機執行

  1. 開啟 Amazon SQS 主控台

  2. 選取由範例專案建立的佇列。

    該名稱將類似於 Example-SQSQueue-wJalrXUtnFEMI

  3. Queue Actions (佇列動作) 清單中,選取 Send a Message (傳送訊息)

  4. 使用複製按鈕來複製以下訊息,然後在 Send a Message (傳送訊息) 視窗中,輸入該訊息,然後選擇 Send Message (傳送訊息)

    注意

    在此範例訊息中,input: 行已使用換行符號進行格式化以符合頁面。使用複製按鈕,或確保將該行輸入為不換行的單一行。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 選擇關閉

  6. 開啟「Step Functions」主控台

  7. 轉到您的 Amazon CloudWatch 日誌日誌組並檢查日誌。記錄群組的名稱看起來像範例 ExpressLogGroup-如何使用。

範例 Lambda 函數代碼

以下是 Lambda 函數程式碼,顯示起始 Lambda 函數如何使用 AWS SDK 啟動狀態機器執行。

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

範例狀態機器程式碼

此範例專案中的快速工作流程,包含了一組用於文字處理的 Lambda 函數。

如需如何 AWS Step Functions 控制其他 AWS 服務的詳細資訊,請參閱AWS Step Functions 搭配其他服務使用

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM 範例

範例專案產生的此範例 AWS Identity and Access Management (IAM) 政策包含執行狀態機器及相關資源所需的最低權限。我們建議您僅在 IAM 政策中加入必要的許可。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

下列原則可確保記 CloudWatch 錄檔擁有足夠的權限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

如需將 Step Functions 與其他 AWS 服務搭配使用時如何設定 IAM 的詳細資訊,請參閱整合式服務的 IAM 政策