本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
处理来自 Amazon SQS(快速工作流)的大批量消息
此示例项目演示了如何使用 AWS Step Functions Express Workflow 来处理来自高容量事件源(例如亚马逊简单队列服务 (Amazon SQS))的消息或数据。由于快速工作流可以非常高的速率启动,因此它们非常适合大批量事件处理或流数据工作负载。
以下是从事件源执行状态机的两种常用方法:
-
配置 Amazon CloudWatch Events 规则,以便在事件源发出事件时启动状态机执行。有关更多信息,请参阅创建在 CloudWatch 事件上触发的事件规则。
-
将事件源映射到 Lambda 函数,并编写函数代码来执行状态机。每次事件源发出事件时,都会调用该 AWS Lambda 函数,进而启动状态机执行。有关更多信息,请参阅结合使用 AWS Lambda 与 Amazon SQS。
此示例项目使用第二种方法在 Amazon SQS 队列每次发送消息时启动执行。您可以使用类似的配置,触发来自其他事件源的快速工作流执行,比如 Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis。
有关快速工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:
第 1 步:创建状态机并预置资源
-
打开 Step Functions 控制台
,然后选择创建状态机。 -
在搜索框中键入
Process high-volume messages from SQS
,然后从返回的搜索结果中选择处理来自 SQS 的大批量消息。 -
选择下一步以继续。
-
Step Functions 列出了您选择的示例项目中 AWS 服务 使用的。它还显示了示例项目的工作流图。将此项目部署到您的, AWS 账户 或者将其用作构建您自己的项目的起点。根据您想继续的方式,选择运行演示或构建依据。
该示例项目部署了以下资源:
-
四个 Lambda 函数
-
一个 Amazon SQS 队列
-
AWS Step Functions 状态机
-
相关 AWS Identity and Access Management (IAM) 角色
下图显示了处理来自 SQS 的大批量消息示例项目的工作流图:
-
-
选择使用模板继续进行选择。
-
请执行以下操作之一:
-
如果您选择构建依据,Step Functions 将为您选择的示例项目创建工作流原型。Step Functions 不会部署工作流定义中列出的资源。
在 Workflow Studio 的设计模式下,从状态浏览器中拖放状态,继续构建工作流原型。或者切换到代码模式,该模式提供了一个类似于 VS Code 的集成代码编辑器,用于在 Step Functions 控制台中更新状态机的 Amazon States Language (ASL) 定义。有关使用 Workflow Studio 构建状态机的更多信息,请参阅使用 Workflow Studio。
重要
请记住,在运行工作流之前,为示例项目中使用的资源更新占位符 Amazon 资源名称 (ARN)。
-
如果您选择了 “运行演示”,Step Functions 将创建一个只读示例项目,该项目使用 AWS CloudFormation 模板将该模板中列出的 AWS 资源部署到您的 AWS 账户。
提示
要查看示例项目的状态机定义,请选择代码。
准备就绪后,选择部署并运行以部署示例项目并创建资源。
创建这些资源和相关 IAM 权限可能需要长达 10 分钟的时间。在部署资源时,您可以打开 CloudFormation 堆栈 ID 链接以查看正在配置哪些资源。
创建示例项目中的所有资源后,您可以在状态机页面上看到新的示例项目。
重要
CloudFormation 模板中使用的每项服务都可能收取标准费用。
-
第 2 步:触发状态机执行
-
打开 Amazon SQS 控制台
。 -
选择示例项目创建的队列。
该名称将类似于 Example-SQSQueue-wJalrXUtnFEMI。
-
在 Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)。
-
使用复制按钮复制以下消息,然后在 Send a Message (发送邮件) 窗口中输入该消息,然后选择 Send Message (发送邮件)。
注意
在此示例消息中,已使用换行符对
input:
进行格式化以适应页面。使用复制按钮或以其他方式确保它作为一行输入,没有中断。{ "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW
-
选择关闭。
-
前往您的 Amazon CloudWatch 日志组
并检查日志。日志组的名称将类似于示例 ExpressLogGroup-wj alrxutnFemi。
示例 Lambda 函数代码
以下是 Lambda 函数代码,它显示了启动的 Lambda 函数如何使用软件开发工具包启动状态机执行。 AWS
import boto3
def lambda_handler(event, context):
message_body = event['Records'][0]['body']
client = boto3.client('stepfunctions')
response = client.start_execution(
stateMachineArn='${ExpressStateMachineArn}',
input=message_body
)
示例状态机代码
此示例项目中的快速工作流包含一组用于文本处理的 Lambda 函数。
有关 AWS Step Functions 如何控制其他 AWS 服务的更多信息,请参阅与其他服务 AWS Step Functions 一起使用。
{
"Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
"StartAt": "Decode base64 string",
"States": {
"Decode base64 string": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Generate statistics"
},
"Generate statistics": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Remove special characters"
},
"Remove special characters": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Tokenize and count"
},
"Tokenize and count": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"End": true
}
}
}
IAM 示例
示例项目生成的此示例 AWS Identity and Access Management (IAM) 策略包括执行状态机和相关资源所需的最低权限。我们建议在您的 IAM 策略中仅包含这些必需的权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF",
"arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF"
],
"Effect": "Allow"
}
]
}
以下策略可确保有足够的 CloudWatch 日志权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
有关在将 Step Functions 与其他 AWS 服务一起使用时如何配置 IAM 的信息,请参阅集成服务的 IAM 策略。