使用验证、转换和分区编排 ETL 管道 AWS Step Functions - AWS Prescriptive Guidance

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用验证、转换和分区编排 ETL 管道 AWS Step Functions

Sandip Gangapadhyay,亚马逊 Web Services

摘要

此示例介绍了如何构建无服务器提取、转换、加载(ETL)管道,以验证、转换、压缩和分区大型 CSV 数据集,从而实现性能和成本优化。该管道由编排 AWS Step Functions 并包括错误处理、自动重试和用户通知功能。

将 CSV 文件上传至 Amazon Simple Storage Service (Amazon S3) 存储桶文件夹,ETL 管道开始运行。该管道验证源 CSV 文件的内容和架构,将 CSV 文件转换为压缩 Apache Parquet 格式,按年、月和日对数据集进行分区,并将其存储在单独的文件夹中供分析工具处理。

自动执行此模式的代码可在带有 AWS Step Functions存储库的 ETL Pipelin e 中找到。 GitHub

先决条件和限制

先决条件

  • 活跃 AWS 账户的.

  • AWS Command Line Interface (AWS CLI) 已安装并配置为你的 AWS 账户,这样你就可以通过部署 AWS CloudFormation 堆栈来创建 AWS 资源。我们建议使用 AWS CLI 版本 2。有关说明,请参阅 AWS CLI 文档 AWS CLI中的安装或更新到最新版本的。有关配置说明,请参阅 AWS CLI 文档中的配置和凭据文件设置

  • Amazon S3 存储桶。

  • 具有正确架构的 CSV 数据集。(此模式中包含的代码存储库提供了示例 CSV 文件,其中包含您可以使用的正确架构和数据类型。)

  • 支持 AWS Management Console. 的网络浏览器 (请参阅支持的浏览器列表。)

  • AWS Glue 控制台访问权限。

  • AWS Step Functions 控制台访问权限。

限制

  • 在中 AWS Step Functions,保存历史记录日志的最大限制为 90 天。有关更多信息,请参阅 AWS Step Functions 文档中的 Ste p Functions 服务配额

产品版本

  • Python 3.13 适用于 AWS Lambda

  • AWS Glue 版本 4.0

架构

从 S3 源存储桶通过 Step Functions、AWS Glue 和 Amazon SNS 进行 ETL 处理,分为 10 个步骤。

图中所示的工作流包括以下高级步骤:

  1. 用户将 CSV 文件上传至 Amazon S3 中的源文件夹。

  2. Amazon S3 通知事件启动启动 AWS Step Functions 状态机的 AWS Lambda 函数。

  3. Lambda 函数验证原始 CSV 文件架构和数据类型。

  4. 根据验证结果:

    1. 如源文件验证成功,则文件将移至舞台文件夹进行进一步处理。

    2. 如果验证失败,文件将移至错误文件夹,并由 Amazon Simple Notification Service (Amazon SNS) 发送错误通知。

  5. AWS Glue 爬虫从 Amazon S3 的舞台文件夹中创建原始文件的架构。

  6. AWS Glue 作业将原始文件转换、压缩和分区为 Parquet 格式。

  7. 该 AWS Glue 任务还会将文件移动到 Amazon S3 中的转换文件夹。

  8. C AWS Glue rawler 根据转换后的文件创建架构。生成的架构用于任何分析作业。您还可以使用 Amazon Athena 中运行临时查询。

  9. 如果管道在无错误的情况下完成,则架构文件将移至存档文件夹。如遇到任何错误,则会将文件移至错误文件夹。

  10. Amazon SNS 会根据管道完成状态发送通知,指示成功或者失败。

此模式中使用的所有 AWS 资源都是无服务器的。没有需要管理的服务器。

工具

AWS 服务

  • AWS Glue— AWS Glue 是一项完全托管的 ETL 服务,可让客户轻松准备和加载数据进行分析。

  • AWS Step Functions— AWS Step Functions 是一项无服务器编排服务,可让您组合 AWS Lambda 功能和其他功能 AWS 服务 来构建关键业务应用程序。通过 AWS Step Functions 图形控制台,您可以将应用程序的工作流程视为一系列事件驱动的步骤。

  • Amazon S3 – Amazon Simple Storage Service(Amazon S3)是一种对象存储服务,提供行业领先的可扩展性、数据可用性、安全性和性能。

  • Amazon SNS — 亚马逊简单通知服务 (Amazon SNS) Simple Notification 是一项高度可用、耐用、安全、完全 pub/sub 托管的消息服务,可让您分离微服务、分布式系统和无服务器应用程序。

  • AWS Lambda— AWS Lambda 是一项计算服务,允许您在不预配置或管理服务器的情况下运行代码。 AWS Lambda 仅在需要时运行您的代码,并自动缩放,从每天几个请求到每秒数千个请求。

代码

此模式的代码可在带有 AWS Step Functions存储库的 ETL Pipelin e 中找到。 GitHub代码存储库包含以下文件和文件夹:

  • template.yml— 用于创建 ETL 管道的 AWS CloudFormation AWS Step Functions模板。

  • parameter.json — 包含所有参数和参数值。您可更新此文件以更改参数值,如操作说明部分所述。

  • myLayer/python文件夹-包含为此项目创建所需 AWS Lambda 图层所需的 Python 包。

  • lambda 文件夹 - 包含以下 Lambda 函数:

    • move_file.py — 将源数据集移动到存档、转换或错误文件夹。

    • check_crawler.py— 在 AWS Glue 爬网程序发送失败消息之前,根据RETRYLIMIT 环境变量的配置多次检查其状态。

    • start_crawler.py— 启动 AWS Glue 爬行器。

    • start_step_function.py— 开始 AWS Step Functions。

    • start_codebuild.py— 启动 AWS CodeBuild 项目。

    • validation.py — 验证输入的原始数据集。

    • s3object.py— 在 Amazon S3 存储桶内创建所需的目录结构。

    • notification.py — 在管道结束时发送成功或错误通知。

若要使用示例代码,请按照操作部分的说明执行。

操作说明

Task描述所需技能

克隆示例代码存储库。

  1. 使用 AWS Step Functions存储库打开 ETL 管道

  2. 在主存储库页面的文件列表上方选择代码,然后复制使用 HTTPS 克隆 下列出的 URL。

  3. 将工作目录更改为要存储示例文件的位置。

  4. 在终端或命令提示符处,输入命令:

    git clone <repoURL>

    其中,<repoURL> 是指您在步骤 2 中复制的 URL。

开发人员

更新参数值。

在存储库本地副本中,编辑 parameter.json 文件并更新默认参数值,如下所示:

  • pS3BucketName─ 用于存储数据集的 Amazon S3 存储桶的名称。此模板将为您创建此存储桶。存储桶名称必须全局唯一。

  • pSourceFolder─ Amazon S3 存储桶中将用于上传源 CSV 文件的文件夹的名称。

  • pStageFolder─ Amazon S3 存储桶中将在该过程中用作暂存区域的文件夹的名称。

  • pTransformFolder─ Amazon S3 存储桶内用于存储已转换和分区数据集的文件夹名称。

  • pErrorFolder─ 如果无法验证源 CSV 文件,则该文件将移至 Amazon S3 存储桶内的文件夹。

  • pArchiveFolder ─ Amazon S3 存储桶中将用于存档源 CSV 文件的文件夹的名称。

  • pEmailforNotification─ 用于接收 success/error 通知的有效电子邮件地址。

  • pPrefix─ 将在 AWS Glue 爬虫名称中使用的前缀字符串。

  • pDatasetSchema ─ 将对源文件进行验证的数据集架构。Cerberus Python 数据包用于源数据集验证。有关更多信息,请参阅 Cerberus 网站。

开发人员

将源代码上传到 Amazon S3 存储桶。

在部署自动执行 ETL 管道的 AWS CloudFormation 模板之前,必须打包模板的源文件并将其上传到 Amazon S3 存储桶。为此,请使用预先配置的配置文件运行以下 AWS CLI 命令:

aws cloudformation package --template-file template.yml --s3-bucket <bucket_name> --output-template-file packaged.template --profile <profile_name>

其中:

  • <bucket_name>是您要在 AWS 区域 其中部署堆栈的现有 Amazon S3 存储桶的名称。此存储桶用于存储 CloudFormation 模板的源代码包。

  • <profile_name>是您在设置 AWS CLI时预先配置的有效配置 AWS CLI 文件。

开发人员
Task描述所需技能

部署 CloudFormation 模板。

要部署 AWS CloudFormation 模板,请运行以下 AWS CLI 命令:

aws cloudformation deploy --stack-name <stack_name> --template-file packaged.template --parameter-overrides file://parameter.json --capabilities CAPABILITY_IAM --profile <profile_name>

其中:

  • <stack_name>是 CloudFormation 堆栈的唯一标识符。

  • <profile-name>是您预先配置的 AWS CLI 配置文件。

开发人员

查看进度。

AWS CloudFormation 控制台上,查看堆栈开发进度。当状态为时 CREATE_COMPLETE,表示堆栈已成功部署。

开发人员

记下 AWS Glue 数据库名称。

堆栈的 “输出” 选项卡显示 AWS Glue 数据库的名称。键名称为 GlueDBOutput

开发人员
Task描述所需技能

启动 ETL 管道。

  1. 导航到 Amazon S3 存储桶内的源文件夹(source或您在parameter.json文件中设置的文件夹名称)。

  2. 将示例 CSV 文件上传至此文件夹。(代码存储库提供了一个名为 Sample_Bank_Transaction_Raw_Dataset.csv 的示例文件以供您使用。) 上传文件后,将通过 Step Functions 启动 ETL 管道。

  3. Step Functions 控制台,检查 ETL 管道状态。

开发人员

查看分区数据集。

ETL 管道完成后,确认分区数据集在 Amazon S3 转换文件夹(transform 或您在 parameter.json 文件中设置的文件夹名称)中可用。

开发人员

检查分区 AWS Glue 数据库。

  1. AWS Glue 控制台上,选择堆栈创建 AWS Glue 的数据库(这是您在上一篇长篇故事中记下的数据库)。

  2. 确认分区表在中可用。 AWS Glue Data Catalog

开发人员

运行查询。

(可选)使用 Amazon Athena 对已分区和转换的数据库运行临时查询。有关说明,请参阅文档中的在 Amazon Athena 中运行 SQL 查询。 AWS

数据库分析师

故障排除

事务解决方案

AWS Identity and Access Management AWS Glue 任务和爬虫的 (IAM) 权限

如果您进一步自定义 AWS Glue 任务或抓取工具,请务必在 AWS Glue 任务使用的 IAM 角色中授予相应的 IAM 权限,或者向提供数据权限。 AWS Lake Formation有关更多信息,请参阅 AWS 文档

相关资源

AWS 服务 文档

其他信息

下图显示了 Insp AWS Step Functions ector 面板中成功建立 ETL 管道 AWS Step Functions 的工作流程。

Step Functions 工作流程,用于验证输入的.csv、抓取数据和运行 AWS Glue 作业。

下图显示了 Step Functions Insp ec tor 面板中因输入验证错误而失败的 ETL 管道 AWS Step Functions 的工作流程。

Step Functions 工作流程失败,因此文件移动到错误文件夹。