本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用验证、转换和分区编排 ETL 管道 AWS Step Functions
Sandip Gangapadhyay,亚马逊 Web Services
摘要
此示例介绍了如何构建无服务器提取、转换、加载(ETL)管道,以验证、转换、压缩和分区大型 CSV 数据集,从而实现性能和成本优化。该管道由编排 AWS Step Functions 并包括错误处理、自动重试和用户通知功能。
将 CSV 文件上传至 Amazon Simple Storage Service (Amazon S3) 存储桶文件夹,ETL 管道开始运行。该管道验证源 CSV 文件的内容和架构,将 CSV 文件转换为压缩 Apache Parquet 格式,按年、月和日对数据集进行分区,并将其存储在单独的文件夹中供分析工具处理。
自动执行此模式的代码可在带有 AWS Step Functions存储库的 ETL Pipelin
先决条件和限制
先决条件
活跃 AWS 账户的.
AWS Command Line Interface (AWS CLI) 已安装并配置为你的 AWS 账户,这样你就可以通过部署 AWS CloudFormation 堆栈来创建 AWS 资源。我们建议使用 AWS CLI 版本 2。有关说明,请参阅 AWS CLI 文档 AWS CLI中的安装或更新到最新版本的。有关配置说明,请参阅 AWS CLI 文档中的配置和凭据文件设置。
Amazon S3 存储桶。
具有正确架构的 CSV 数据集。(此模式中包含的代码存储库
提供了示例 CSV 文件,其中包含您可以使用的正确架构和数据类型。) 支持 AWS Management Console. 的网络浏览器 (请参阅支持的浏览器列表
。) AWS Glue 控制台访问权限。
AWS Step Functions 控制台访问权限。
限制
在中 AWS Step Functions,保存历史记录日志的最大限制为 90 天。有关更多信息,请参阅 AWS Step Functions 文档中的 Ste p Functions 服务配额。
产品版本
Python 3.13 适用于 AWS Lambda
AWS Glue 版本 4.0
架构

图中所示的工作流包括以下高级步骤:
用户将 CSV 文件上传至 Amazon S3 中的源文件夹。
Amazon S3 通知事件启动启动 AWS Step Functions 状态机的 AWS Lambda 函数。
Lambda 函数验证原始 CSV 文件架构和数据类型。
根据验证结果:
如源文件验证成功,则文件将移至舞台文件夹进行进一步处理。
如果验证失败,文件将移至错误文件夹,并由 Amazon Simple Notification Service (Amazon SNS) 发送错误通知。
AWS Glue 爬虫从 Amazon S3 的舞台文件夹中创建原始文件的架构。
AWS Glue 作业将原始文件转换、压缩和分区为 Parquet 格式。
该 AWS Glue 任务还会将文件移动到 Amazon S3 中的转换文件夹。
C AWS Glue rawler 根据转换后的文件创建架构。生成的架构用于任何分析作业。您还可以使用 Amazon Athena 中运行临时查询。
如果管道在无错误的情况下完成,则架构文件将移至存档文件夹。如遇到任何错误,则会将文件移至错误文件夹。
Amazon SNS 会根据管道完成状态发送通知,指示成功或者失败。
此模式中使用的所有 AWS 资源都是无服务器的。没有需要管理的服务器。
工具
AWS 服务
AWS Glue
— AWS Glue 是一项完全托管的 ETL 服务,可让客户轻松准备和加载数据进行分析。 AWS Step Functions
— AWS Step Functions 是一项无服务器编排服务,可让您组合 AWS Lambda 功能和其他功能 AWS 服务 来构建关键业务应用程序。通过 AWS Step Functions 图形控制台,您可以将应用程序的工作流程视为一系列事件驱动的步骤。 Amazon S3
– Amazon Simple Storage Service(Amazon S3)是一种对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。 Amazon SNS
— 亚马逊简单通知服务 (Amazon SNS) Simple Notification 是一项高度可用、耐用、安全、完全 pub/sub 托管的消息服务,可让您分离微服务、分布式系统和无服务器应用程序。 AWS Lambda
— AWS Lambda 是一项计算服务,允许您在不预配置或管理服务器的情况下运行代码。 AWS Lambda 仅在需要时运行您的代码,并自动缩放,从每天几个请求到每秒数千个请求。
代码
此模式的代码可在带有 AWS Step Functions存储库的 ETL Pipelin
template.yml
— 用于创建 ETL 管道的 AWS CloudFormation AWS Step Functions模板。parameter.json
— 包含所有参数和参数值。您可更新此文件以更改参数值,如操作说明部分所述。myLayer/python
文件夹-包含为此项目创建所需 AWS Lambda 图层所需的 Python 包。lambda
文件夹 - 包含以下 Lambda 函数:move_file.py
— 将源数据集移动到存档、转换或错误文件夹。check_crawler.py
— 在 AWS Glue 爬网程序发送失败消息之前,根据RETRYLIMIT
环境变量的配置多次检查其状态。start_crawler.py
— 启动 AWS Glue 爬行器。start_step_function.py
— 开始 AWS Step Functions。start_codebuild.py
— 启动 AWS CodeBuild 项目。validation.py
— 验证输入的原始数据集。s3object.py
— 在 Amazon S3 存储桶内创建所需的目录结构。notification.py
— 在管道结束时发送成功或错误通知。
若要使用示例代码,请按照操作部分的说明执行。
操作说明
Task | 描述 | 所需技能 |
---|---|---|
克隆示例代码存储库。 |
| 开发人员 |
更新参数值。 | 在存储库本地副本中,编辑
| 开发人员 |
将源代码上传到 Amazon S3 存储桶。 | 在部署自动执行 ETL 管道的 AWS CloudFormation 模板之前,必须打包模板的源文件并将其上传到 Amazon S3 存储桶。为此,请使用预先配置的配置文件运行以下 AWS CLI 命令:
其中:
| 开发人员 |
Task | 描述 | 所需技能 |
---|---|---|
部署 CloudFormation 模板。 | 要部署 AWS CloudFormation 模板,请运行以下 AWS CLI 命令:
其中:
| 开发人员 |
查看进度。 | 在AWS CloudFormation 控制台 | 开发人员 |
记下 AWS Glue 数据库名称。 | 堆栈的 “输出” 选项卡显示 AWS Glue 数据库的名称。键名称为 | 开发人员 |
Task | 描述 | 所需技能 |
---|---|---|
启动 ETL 管道。 |
| 开发人员 |
查看分区数据集。 | ETL 管道完成后,确认分区数据集在 Amazon S3 转换文件夹( | 开发人员 |
检查分区 AWS Glue 数据库。 |
| 开发人员 |
运行查询。 | (可选)使用 Amazon Athena 对已分区和转换的数据库运行临时查询。有关说明,请参阅文档中的在 Amazon Athena 中运行 SQL 查询。 AWS | 数据库分析师 |
故障排除
事务 | 解决方案 |
---|---|
AWS Identity and Access Management AWS Glue 任务和爬虫的 (IAM) 权限 | 如果您进一步自定义 AWS Glue 任务或抓取工具,请务必在 AWS Glue 任务使用的 IAM 角色中授予相应的 IAM 权限,或者向提供数据权限。 AWS Lake Formation有关更多信息,请参阅 AWS 文档。 |
相关资源
AWS 服务 文档
其他信息
下图显示了 Insp AWS Step Functions ector 面板中成功建立 ETL 管道 AWS Step Functions 的工作流程。

下图显示了 Step Functions Insp ec tor 面板中因输入验证错误而失败的 ETL 管道 AWS Step Functions 的工作流程。
