本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
SQS使用 Step Functions Express 工作流程处理来自亚马逊的大量消息
此示例项目演示了如何使用 AWS Step Functions Express Workflow 用于处理来自高容量事件源(例如亚马逊简单队列服务 (AmazonSQS))的消息或数据。由于快速工作流可以非常高的速率启动,因此它们非常适合大批量事件处理或流数据工作负载。
以下是从事件源执行状态机的两种常用方法:
-
配置 Amazon CloudWatch Events 规则,以便在事件源发出事件时启动状态机执行。有关更多信息,请参阅创建在 CloudWatch 事件上触发的事件规则。
-
将事件源映射到 Lambda 函数,并编写函数代码来执行状态机。这些区域有: AWS Lambda 每次事件源发出事件时都会调用函数,进而启动状态机执行。有关更多信息,请参阅使用 AWS Lambda 使用亚马逊SQS。
此示例项目使用第二种方法在 Amazon 队列每次 Amazon SQS 队列发送消息时开始执行。您可以使用类似的配置,触发来自其他事件源的快速工作流执行,比如 Amazon Simple Storage Service (Amazon S3)、Amazon DynamoDB 和 Amazon Kinesis。
有关快速工作流和 Step Functions 服务集成的更多信息,请参阅以下内容:
第 1 步:创建状态机
-
打开 Step Functions 控制台
,然后选择创建状态机。 -
Process high-volume messages from SQS
在搜索框中键入,然后SQS从返回的搜索结果中选择 “处理大量邮件”。 -
选择下一步以继续。
-
选择 “运行演示” 以创建只读和 ready-to-deploy 工作流程,或者选择 “在其上构建” 以创建可编辑的状态机定义,您可以在此基础上构建并稍后部署。
该示例项目部署了以下资源:
-
四个 Lambda 函数
-
亚马逊SQS队列
-
网络 ACL 和安全组都允许 (因此可到达您的实例) 的发起 ping 的 AWS Step Functions 状态机
-
相关 AWS Identity and Access Management (IAM) 角色
下图显示了 “处理来自SQS示例项目的大量消息” 的工作流程图:
-
-
选择使用模板继续进行选择。
后续步骤取决于您之前的选择:
-
运行演示 — 您可以先查看状态机,然后再使用部署的资源创建只读项目 AWS CloudFormation 给你的 AWS 账户.
您可以查看状态机定义,准备就绪后,选择部署并运行以部署项目并创建资源。
部署最多可能需要 10 分钟才能创建资源和权限。您可以使用堆栈 ID 链接来监控进度 AWS CloudFormation.
部署完成后,您应该会在控制台中看到您的新状态机。
-
在此基础上再接再厉 — 您可以查看和编辑工作流程定义。在尝试运行自定义工作流程之前,您可能需要为示例项目中的占位符设置值。
注意
部署到您的账户的服务可能会收取标准费用。
第 2 步:触发状态机执行
-
打开 Amazon SQS 控制台
。 -
选择示例项目创建的队列。
该名称将类似于示例-SQSQueue-wJalr XUtnFEMI。
-
在 Queue Actions (队列操作) 列表中,选择 Send a Message (发送消息)。
-
使用复制按钮复制以下消息,然后在 Send a Message (发送邮件) 窗口中输入该消息,然后选择 Send Message (发送邮件)。
注意
在此示例消息中,已使用换行符对
input:
进行格式化以适应页面。使用复制按钮或以其他方式确保它作为一行输入,没有中断。{ "input": "QW5kIGxpa2UgdGhlIGJhc2VsZXNzIGZhYnJpYyBvZiB0aGlzIHZpc2lvbiwgVGhlIGNsb3VkLWNhcHBlZCB0b3dlcnMsIHRoZSBnb3JnZW
-
选择关闭。
-
前往您的 Amazon CloudWatch 日志组
并检查日志。日志组的名称将类似于示例-ExpressLogGroup-wJalr XUtnFEMI。
示例 Lambda 函数代码
以下是 Lambda 函数代码,它显示了启动的 Lambda 函数如何使用启动状态机执行 AWS SDK.
import boto3
def lambda_handler(event, context):
message_body = event['Records'][0]['body']
client = boto3.client('stepfunctions')
response = client.start_execution(
stateMachineArn='${ExpressStateMachineArn}',
input=message_body
)
示例状态机代码
此示例项目中的快速工作流包含一组用于文本处理的 Lambda 函数。
有关如何做的更多信息 AWS Step Functions 可以控制其他 AWS 服务,请参阅将服务与 Step Functions 集成。
{
"Comment": "An example of using Express workflows to run text processing for each message sent from an SQS queue.",
"StartAt": "Decode base64 string",
"States": {
"Decode base64 string": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<BASE64_DECODER_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Generate statistics"
},
"Generate statistics": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TEXT_STATS_GENERATING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Remove special characters"
},
"Remove special characters": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<STRING_CLEANING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"Next": "Tokenize and count"
},
"Tokenize and count": {
"Type": "Task",
"Resource": "arn:<PARTITION>:states:::lambda:invoke",
"OutputPath": "$.Payload",
"Parameters": {
"FunctionName": "<TOKENIZING_AND_WORD_COUNTING_LAMBDA_FUNCTION_NAME>",
"Payload.$": "$"
},
"End": true
}
}
}
IAM示例
此示例 AWS Identity and Access Management (IAM) 示例项目生成的策略包括执行状态机和相关资源所需的最低权限。我们建议您在IAM策略中仅包含必要的权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"lambda:InvokeFunction"
],
"Resource": [
"arn:aws:lambda:us-east-1:123456789012:function:example-Base64DecodeLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-StringCleanerLambda-je7MtGbClwBF",
"arn:aws:lambda:us-east-1:123456789012:function:example-TokenizerCounterLambda-wJalrXUtnFEMI",
"arn:aws:lambda:us-east-1:123456789012:function:example-GenerateStatsLambda-je7MtGbClwBF"
],
"Effect": "Allow"
}
]
}
以下策略可确保有足够的 CloudWatch 日志权限。
{
"Version": "2012-10-17",
"Statement": [
{
"Action": [
"logs:CreateLogDelivery",
"logs:GetLogDelivery",
"logs:UpdateLogDelivery",
"logs:DeleteLogDelivery",
"logs:ListLogDeliveries",
"logs:PutResourcePolicy",
"logs:DescribeResourcePolicies",
"logs:DescribeLogGroups"
],
"Resource": [
"*"
],
"Effect": "Allow"
}
]
}
有关在将 Step Functions 与其他功能一起使用IAM时如何进行配置的信息 AWS 服务,请参阅Step Functions 如何为集成服务生成IAM策略。