本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
教程:创建可筛选源事件的 EventBridge 管道
在本教程中,您将创建一个管道,将 DynamoDB 流源连接到 Amazon SQS 队列目标。其中包括为管道指定事件模式,用于筛选要传送到队列的事件。然后,您将测试管道以确保只传送所需的事件。
先决条件:创建源和目标
在创建管道之前,您需要创建管道要连接的源和目标。在本例中,Amazon DynamoDB 数据流作为管道源,Amazon SQS 队列作为管道目标。
要简化此步骤,您可以使用 AWS CloudFormation 预置源和目标资源。方法是创建一个 CloudFormation 模板,定义以下资源:
-
管道源
名为
pipe-tutorial-source
的 Amazon DynamoDB 表,启用了一个流,提供有关 DynamoDB 表中项目更改情况的的排序信息流。 管道目标
名为
pipe-tutorial-target
的 Amazon SQS 队列,用于从您的管道接收 DynamoDB 事件流。
创建 CloudFormation 模板,预置管道资源
复制下方 用于生成先决条件的 AWS CloudFormation 模板 部分中的 JSON 模板文本。
将模板另存为 JSON 文件(例如,
~/pipe-tutorial-resources.json
)。
接下来,使用您刚刚创建的模板文件来预置 CloudFormation 堆栈。
注意
创建 CloudFormation 堆栈后,您需要为其预置的 AWS 资源付费。
使用 AWS CLI 预置教程先决条件
运行以下 CLI 命令,其中
--template-body
指定模板文件的位置:aws cloudformation create-stack --stack-name
pipe-tuturial-resources
--template-body file://~/pipe-tutorial-resources.json
使用 CloudFormation 控制台预置教程先决条件
-
打开 AWS CloudFormation 控制台,地址:https://console.aws.amazon.com/cloudformation
。 依次选择堆栈、选择创建堆栈、使用新资源(标准)。
CloudFormation 会显示创建堆栈向导。
对于先决条件 - 准备模板,保留默认值,模板已准备就绪已选中。
在指定模板下选择上传模板文件,然后选择文件并选择下一步。
-
配置堆栈及其要预置的资源:
对于堆栈名称,输入
pipe-tuturial-resources
。在参数中,请保留 DynamoDB 表和 Amazon SQS 队列的默认名称。
选择下一步。
选择下一步,然后选择提交。
CloudFormation 创建堆栈并预置模板中定义的资源。
有关 CloudFormation 的更多信息,请参阅《AWS CloudFormation 用户指南》中的什么是 AWS CloudFormation?。
步骤 1:创建管道
预置管道源和目标后,您现在可以创建管道来连接这两个服务。
使用 EventBridge 控制台创建管道
访问 https://console.aws.amazon.com/events/
,打开 Amazon EventBridge 控制台。 在导航窗格中,选择管道。
选择创建管道。
对于名称,将管道命名为
pipe-tutorial
。指定 DynamoDB 数据流源:
在详细信息下,对于源,选择 DynamoDB 数据流。
EventBridge 将显示特定于 DynamoDB 的源配置设置。
对于 DynamoDB 流,选择
pipe-tutorial-source
。将起始位置保留为默认值
Latest
。选择下一步。
指定并测试用于筛选事件的事件模式:
通过筛选,您可以控制管道将哪些事件发送到富集或目标。管道仅将与事件模式匹配的事件发送到富集或目标。
有关更多信息,请参阅 Amazon EventBridge 管道中的事件筛选。
注意
您只需为发送到富集或目标的事件付费。
在示例事件 - 可选 下,保持 AWS 事件的选中状态,并确保选中 DynamoDB 流示例事件 1。
这是您将用来测试我们的事件模式的示例事件。
在事件模式下,输入以下事件模式:
{ "eventName": ["INSERT", "MODIFY"] }
选择测试模式。
EventBridge 会显示一条消息,说明示例事件与事件模式匹配。这是因为示例事件的
eventName
值为INSERT
。选择下一步。
选择下一步,跳过指定富集的操作。
在此示例中,您无需选择富集。富集支持您选择一项服务,增强来自源的数据,然后再将其发送到目标。有关更多详细信息,请参阅Amazon EventBridge 管道中的事件扩充。
将您的 Amazon SQS 队列指定为管道目标:
在详细信息下,目标服务选择 Amazon SQS 队列。
对于队列,选择
pipe-tutorial-target
。将目标输入转换器部分留空。
有关更多信息,请参阅 Amazon EventBridge Pipes 输入转换。
选择创建管道
EventBridge 会创建管道并显示管道详细信息页面。一旦管道的状态更新为
Running
,即说明管道已准备就绪。
步骤 2:确认管道筛选器事件
管道已设置完毕,但尚未从表中接收事件。
要测试管道,您需要更新 DynamoDB 表中的条目。每次更新都会生成事件,DynamoDB 流会将这些事件发送到我们的管道。有些事件会匹配您指定的事件模式,有些则不会。然后,您可以检查 Amazon SQS 队列,确保管道仅传送了与我们的事件模式匹配的事件。
更新表项目,生成事件
打开 DynamoDB 控制台:https://console.aws.amazon.com/dynamodb/
。 在左侧导航栏上,选择表。选择
pipe-tutorial-source
表。DynamoDB 显示
pipe-tutorial-source
的表详细信息页面。选择浏览表项目,然后选择创建项目。
DynamoDB 会显示创建项目页面。
在属性下,创建一个新的表项目:
在专辑中输入
Album A
。在艺术家中输入
Artist A
。选择创建项目。
更新表项目:
在返回的项目下,选择 Album A。
选择添加新属性,然后选择字符串。
输入
Song
的新值,值为Song A
。选择 Save changes(保存更改)。
删除表项目:
在返回的项目下,选中 Album A。
从操作菜单中选择删除项目。
您已对表项目进行了三次更新;这会为 DynamoDB 数据流生成三个事件:
创建项目时会生成
INSERT
事件。为项目添加属性时会生成
MODIFY
事件。删除项目时会生成
REMOVE
事件。
但是,您为管道指定的事件模式应筛选掉所有不是 INSERT
或 MODIFY
的事件。接下来,确认管道已将预期的事件传送到队列。
确认预期的事件已传送到队列。
通过以下网址打开 Amazon SQS 控制台:https://console.aws.amazon.com/sqs/
。 选择
pipe-tutorial-target
队列。Amazon SQS 会显示队列详情页面。
选择发送和接收消息,然后在接收消息下选择轮询消息。
队列会轮询管道,然后列出它收到的事件。
选择事件名称,查看已传送的事件 JSON。
队列中应该有两个事件:一个 eventName
为 INSERT
,另一个 eventName
为 MODIFY
。但是,管道没有传送删除表项目的事件,因为该事件的 eventName
为 REMOVE
,与您在管道中指定的事件模式不匹配。
步骤 3:清理资源
首先,删除管道本身。
使用 EventBridge 控制台删除管道
访问 https://console.aws.amazon.com/events/
,打开 Amazon EventBridge 控制台。 在导航窗格中,选择管道。
选择
pipe-tutorial
管道并选择删除。
然后删除 CloudFormation 堆栈,以免因继续使用其中预置的资源而被收费。
使用 AWS CLI 删除教程先决条件
运行以下 CLI 命令,其中
--stack-name
指定堆栈的名称:aws cloudformation delete-stack --stack-name
pipe-tuturial-resources
使用 AWS CloudFormation 控制台删除教程先决条件
-
打开 AWS CloudFormation 控制台,地址:https://console.aws.amazon.com/cloudformation
。 在堆栈页面上,选择堆栈,然后选择删除。
选择删除确认您的操作。
用于生成先决条件的 AWS CloudFormation 模板
使用下面的 JSON 创建 CloudFormation 模板,用于预置本教程所需的源和目标资源。
{ "AWSTemplateFormatVersion": "2010-09-09", "Description" : "Provisions resources to use with the EventBridge Pipes tutorial. You will be billed for the AWS resources used if you create a stack from this template.", "Parameters" : { "SourceTableName" : { "Type" : "String", "Default" : "pipe-tutorial-source", "Description" : "Specify the name of the table to provision as the pipe source, or accept the default." }, "TargetQueueName" : { "Type" : "String", "Default" : "pipe-tutorial-target", "Description" : "Specify the name of the queue to provision as the pipe target, or accept the default." } }, "Resources": { "PipeTutorialSourceDynamoDBTable": { "Type": "AWS::DynamoDB::Table", "Properties": { "AttributeDefinitions": [{ "AttributeName": "Album", "AttributeType": "S" }, { "AttributeName": "Artist", "AttributeType": "S" } ], "KeySchema": [{ "AttributeName": "Album", "KeyType": "HASH" }, { "AttributeName": "Artist", "KeyType": "RANGE" } ], "ProvisionedThroughput": { "ReadCapacityUnits": 10, "WriteCapacityUnits": 10 }, "StreamSpecification": { "StreamViewType": "NEW_AND_OLD_IMAGES" }, "TableName": { "Ref" : "SourceTableName" } } }, "PipeTutorialTargetQueue": { "Type": "AWS::SQS::Queue", "Properties": { "QueueName": { "Ref" : "TargetQueueName" } } } } }