使用 Kinesis Data Streams 和 Firehose 将 DynamoDB 记录传送到亚马逊 S3 AWS CDK - AWS Prescriptive Guidance

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Kinesis Data Streams 和 Firehose 将 DynamoDB 记录传送到亚马逊 S3 AWS CDK

由 Shashank Shrivastava () 和 Daniel Matuki da Cunha (AWS) 创作 AWS

代码存储库:亚马逊 DynamoDB 提取到亚马逊 S3

环境:PoC 或试点

技术:无服务器、数据湖、数据库、存储和备份

AWS服务:AWSCDK;亚马逊 DynamoDB;亚马逊 Data Firehose;亚马逊 Kinesis Data Streams;Lambda;亚马逊 S3 AWS

Summary

此模式提供了使用亚马逊 Kinesis Data Streams 和 Amazon Data Firehose 将记录从亚马逊 DynamoDB 传输到亚马逊简单存储服务 (Amazon S3) 的示例代码和应用程序。该模式的方法使用 AWS Cloud Development Kit (AWS CDK) L3 结构,并包括一个示例,说明如何在将数据传输到 Amazon Web Services (AWS) 云上的目标 S3 存储桶 AWS Lambda 之前执行数据转换。

Kinesis Data Streams 记录 DynamoDB 表中的项目级别修改,并将它们按要求复制到 Kinesis Data Stream。您的应用程序可以访问 Kinesis 数据流,近实时查看项目级别的更改。Kinesis Data Streams 还提供对其他亚马逊 Kinesis 服务的访问权限,例如 Firehose 和适用于 Apache Flink 的亚马逊托管服务。这意味着您可构建应用程序,以提供实时控制面板、生成警报、实施动态定价和广告以及执行复杂数据分析。

您可将此模式用于数据集成用例。例如,运输车辆或工业设备可将大量数据发送至 DynamoDB 表中。然后,可以转换这些数据,并将其存储至 Amazon S3 中托管的数据湖中。然后,您可以使用无服务器服务(例如亚马逊 Athena、Amazon Redshift Spectrum、Amazon Rekognition 和)查询和处理数据并预测任何潜在的缺陷。 AWS Glue

先决条件和限制

先决条件

架构

下图显示了使用 Kinesis Data Streams 和 Firehose 将记录从 DynamoDB 传输到 Amazon S3 的示例工作流程。

使用 Kinesis Data Streams 和 Firehose 将记录从 DynamoDB 传输到亚马逊 S3 的示例工作流程。

图表显示了以下工作流:

  1. 使用 Amazon Gate API way 作为 DynamoDB 的代理来摄取数据。您也可以使用任何其他来源,将数据采集至 DynamoDB。 

  2. 在 Kinesis Data Streams 中近乎实时生成项目级更改,然后传送至 Amazon S3。

  3. Kinesis Data Streams 将记录发送到 Firehose 进行转换和交付。 

  4. Lambda 函数将记录从 DynamoDB 记录JSON格式转换为仅包含记录项目属性名称和值的格式。

工具

AWS 服务

代码存储库

此模式的代码可在 GitHub aws-dynamodb-kinesisfirehose-s3 摄取存储库中找到。

操作说明

任务描述所需技能

安装依赖项。

在本地计算机上,通过运行以下命令,为 pattern/aws-dynamodb-kinesisstreams-s3sample-application 目录中的 package.json 文件安装依赖项:

cd <project_root>/pattern/aws-dynamodb-kinesisstreams-s3
npm install && npm run build
cd <project_root>/sample-application/
npm install && npm run build

 

应用程序开发者,一般 AWS

生成 CloudFormation 模板。

  1. 运行 cd <project_root>/sample-application/ 命令。

  2. 运行cdk synth命令生成 CloudFormation 模板。

  3. AwsDynamodbKinesisfirehoseS3IngestionStack.template.json 输出存储在 cdk.out 目录中。

  4. 使用 AWS CDK 或 AWS Management Console 在中处理模板 CloudFormation。

应用程序开发者、常规AWS、AWS DevOps
任务描述所需技能

检查和部署资源。

  1. 运行cdk diff命令以识别由 AWS CDK 构造创建的资源类型。

  2. 运行 cdk deploy 命令以部署资源。

应用程序开发者、常规AWS、AWS DevOps
任务描述所需技能

将您的示例数据摄取至 DynamoDB 表中。

在中运行以下命令,向你的 DynamoDB 表发送请求: AWS CLI

aws dynamodb put-item --table-name <your_table_name> --item '{"<table_partition_key>": {"S": "<partition_key_ID>"},"MessageData":{"S": "<data>"}}'

示例:

aws dynamodb put-item --table-name SourceData_table --item '{"SourceDataId": {"S": "123"},"MessageData":{"S": "Hello World"}}'

默认情况下,如果操作成功,put-item 不返回任何值作为输出。如果操作失败,则会返回错误。数据存储在 DynamoDB 中,然后发送到 Kinesis Data Streams 和 Firehose。 

注意:您可以使用不同方法向 DynamoDB 表中添加数据。有关更多信息,请参阅 DynamoDB 文档中的将数据加载到表中

应用程序开发人员

验证是否在 S3 存储桶中创建了新对象。

登录 AWS Management Console 并监控 S3 存储桶,以验证是否使用您发送的数据创建了新对象。 

有关更多信息,请参阅 GetObjectAmazon S3 文档中的。

应用程序开发者,一般 AWS
任务描述所需技能

清理资源。

运行 cdk destroy 命令以删除此模式使用的所有资源。

应用程序开发者,一般 AWS

相关资源