SQS使用 Step Functions Express 工作流程处理来自亚马逊的大量消息 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

SQS使用 Step Functions Express 工作流程处理来自亚马逊的大量消息

此示例项目演示了如何使用 AWS Step Functions Express Workflow 用于处理来自高容量事件源(例如亚马逊简单队列服务 (AmazonSQS))的消息或数据。由于快速工作流可以非常高的速率启动,因此它们非常适合大批量事件处理或流数据工作负载。

以下是从事件源执行状态机的两种常用方法:

  • 配置 Amazon CloudWatch Events 规则,以便在事件源发出事件时启动状态机执行。有关更多信息,请参阅创建在 CloudWatch 事件上触发的事件规则

  • 将事件源映射到 Lambda 函数,并编写函数代码来执行状态机。这些区域有: AWS Lambda 每次事件源发出事件时都会调用函数,进而启动状态机执行。有关更多信息,请参阅使用 AWS Lambda 使用亚马逊SQS

此示例项目使用第二种方法在 Amazon 队列每次 Amazon SQS 队列发送消息时开始执行。您可以使用类似的配置,触发来自其他事件源的快速工作流执行,比如 Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis。

有关快速工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:

第 1 步:创建状态机

  1. 打开 Step Functions 控制台,然后选择创建状态机

  2. Process high-volume messages from SQS在搜索框中键入,然后SQS从返回的搜索结果中选择 “处理大量邮件”。

  3. 选择下一步以继续。

  4. 选择 “运行演示” 以创建只读和 ready-to-deploy 工作流程,或者选择 “在其上构建” 以创建可编辑的状态机定义,您可以在此基础上构建并稍后部署。

    该示例项目部署了以下资源:

    • 四个 Lambda 函数

    • 亚马逊SQS队列

    • 网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 AWS Step Functions 状态机

    • 相关 AWS Identity and Access Management (IAM) 角色

    下图显示了 “处理来自SQS示例项目的大量消息” 的工作流程图:

    处理来自SQS示例项目的大量消息的工作流程图。
  5. 选择使用模板继续进行选择。

后续步骤取决于您之前的选择:

  1. 运行演示 — 您可以先查看状态机,然后再使用部署的资源创建只读项目 AWS CloudFormation 给你的 AWS 账户.

    您可以查看状态机定义,准备就绪后,选择部署并运行以部署项目并创建资源。

    部署最多可能需要 10 分钟才能创建资源和权限。您可以使用堆栈 ID 链接来监控进度 AWS CloudFormation.

    部署完成后,您应该会在控制台中看到您的新状态机。

  2. 在此基础上再接再厉 — 您可以查看和编辑工作流程定义。在尝试运行自定义工作流程之前,您可能需要为示例项目中的占位符设置值。

注意

部署到您的账户的服务可能会收取标准费用。

第 2 步:触发状态机执行

  1. 打开 Amazon SQS 控制台

  2. 选择示例项目创建的队列。

    该名称将类似于示例-SQSQueue-wJalr XUtnFEMI

  3. Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)

  4. 使用复制按钮复制以下消息,然后在 Send a Message (发送邮件) 窗口中输入该消息,然后选择 Send Message (发送邮件)

    注意

    在此示例消息中,已使用换行符对 input: 进行格式化以适应页面。使用复制按钮或以其他方式确保它作为一行输入,没有中断。

    { "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW 91cyBwYWxhY2VzLCBUaGUgc29sZW1uIHRlbXBsZXMsIHRoZSBncmVhdCBnbG9iZSBpdHNlbGbigJQgWWVhLCBhbGwgd2hpY2ggaXQgaW5o ZXJpdOKAlHNoYWxsIGRpc3NvbHZlLCBBbmQgbGlrZSB0aGlzIGluc3Vic3RhbnRpYWwgcGFnZWFudCBmYWRlZCwgTGVhdmUgbm90IGEgcm FjayBiZWhpbmQuIFdlIGFyZSBzdWNoIHN0dWZmIEFzIGRyZWFtcyBhcmUgbWFkZSBvbiwgYW5kIG91ciBsaXR0bGUgbGlmZSBJcyByb3Vu ZGVkIHdpdGggYSBzbGVlcC4gU2lyLCBJIGFtIHZleGVkLiBCZWFyIHdpdGggbXkgd2Vha25lc3MuIE15IG9sZCBicmFpbiBpcyB0cm91Ym xlZC4gQmUgbm90IGRpc3R1cmJlZCB3aXRoIG15IGluZmlybWl0eS4gSWYgeW91IGJlIHBsZWFzZWQsIHJldGlyZSBpbnRvIG15IGNlbGwg QW5kIHRoZXJlIHJlcG9zZS4gQSB0dXJuIG9yIHR3byBJ4oCZbGwgd2FsayBUbyBzdGlsbCBteSBiZWF0aW5nIG1pbmQu" }
  5. 选择关闭

  6. 打开 Step Functions 控制台

  7. 前往您的 Amazon CloudWatch 日志组并检查日志。日志组的名称将类似于示例-ExpressLogGroup-wJalr XUtnFEMI

示例 Lambda 函数代码

以下是 Lambda 函数代码,它显示了启动的 Lambda 函数如何使用启动状态机执行 AWS SDK.

import boto3 def lambda_handler(event, context): message_body = event['Records'][0]['body'] client = boto3.client('stepfunctions') response = client.start_execution( stateMachineArn='${ExpressStateMachineArn}', input=message_body )

示例状态机代码

此示例项目中的快速工作流包含一组用于文本处理的 Lambda 函数。

有关如何做的更多信息 AWS Step Functions 可以控制其他 AWS 服务,请参阅将服务与 Step Functions 集成

{ "Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.", "StartAt": "Decode base64 string", "States": { "Decode base64 string": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Generate statistics" }, "Generate statistics": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Remove special characters" }, "Remove special characters": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "Next": "Tokenize and count" }, "Tokenize and count": { "Type": "Task", "Resource": "arn:<PARTITION>:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>", "Payload.$": "$" }, "End": true } } }

IAM示例

此示例 AWS Identity and Access Management (IAM) 示例项目生成的策略包括执行状态机和相关资源所需的最低权限。我们建议您在IAM策略中仅包含必要的权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "lambda:InvokeFunction" ], "Resource": [ "arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF", "arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI", "arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF" ], "Effect": "Allow" } ] }

以下策略可确保有足够的 CloudWatch 日志权限。

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "logs:CreateLogDelivery", "logs:GetLogDelivery", "logs:UpdateLogDelivery", "logs:DeleteLogDelivery", "logs:ListLogDeliveries", "logs:PutResourcePolicy", "logs:DescribeResourcePolicies", "logs:DescribeLogGroups" ], "Resource": [ "*" ], "Effect": "Allow" } ] }

有关在将 Step Functions 与其他功能一起使用IAM时如何进行配置的信息 AWS 服务,请参阅Step Functions 如何为集成服务生成IAM策略