本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用AWS DataOps 开发套件构建数据管道,以提取、转换和分析 Google Analytics(分析)数据
由 Anton Kukushkin (AWS) 和 Rudy Puig () 创作 AWS
摘要
此模式描述了如何使用AWS DataOps 开发套件 (DDK) 和其他AWS服务构建数据管道来提取、转换和分析 Google Analytics(分析)数据。AWSDDK是一个开源开发框架,可帮助您在上面构建数据工作流程和现代数据架构AWS。的主要目标之一AWSDDK是为您节省通常用于劳动密集型数据管道任务的时间和精力,例如协调管道、构建基础设施和创建基础架构。 DevOps 您可以将这些劳动密集型任务转移到,AWSDDK这样您就可以专注于编写代码和其他高价值的活动。
先决条件和限制
先决条件
产品版本
Python 3.7 或更高版本
pip 9.0.3 或更高版本
架构
技术堆栈
Amazon AppFlow
Amazon Athena
Amazon CloudWatch
Amazon EventBridge
Amazon Simple Storage Service(Amazon S3)
亚马逊简单队列服务(亚马逊SQS)
AWS DataOps 开发套件 (DDK)
AWS Lambda
目标架构
下图显示了摄取、转换和分析 Google Analytics 数据的事件驱动流程。
图表显示了以下工作流:
亚马逊 CloudWatch 计划的事件规则会调用亚马逊。 AppFlow
亚马逊将谷 AppFlow 歌分析数据提取到 S3 存储桶中。
在 S3 存储桶提取数据后,系统会生成中的 EventBridge 事件通知,由 CloudWatch 事件规则捕获,然后将其放入 Amazon SQS 队列中。
Lambda 函数使用来自亚马逊SQS队列的事件,读取相应的 S3 对象,将对象转换为 Apache Parquet 格式,将转换后的对象写入 S3 存储桶,然后创建或更新 Glue 数据目录表定义AWS。
Athena 查询针对此表运行。
工具
AWS工具
Amazon AppFlow 是一项完全托管的集成服务,使您能够在软件即服务 (SaaS) 应用程序之间安全地交换数据。
Amazon Athena 是一项交互式查询服务,可帮助您使用标准直接在 Amazon S3 中分析数据。SQL
Amazon CloudWatch 可帮助您实时监控您的AWS资源和运行的应用程序AWS的指标。
Amazon EventBridge 是一项无服务器事件总线服务,可帮助您将应用程序与来自各种来源的实时数据连接起来。例如,AWSLambda 函数、使用API目标的HTTP调用终端节点或其他账户中的事件总线。AWS
Amazon Simple Storage Service (Amazon S3) 是一项基于云的对象存储服务,可帮助您存储、保护和检索任意数量的数据。
Amazon Simple Queue Service (AmazonSQS) 提供安全、耐用且可用的托管队列,可帮助您集成和分离分布式软件系统和组件。
AWSLambda 是一项计算服务,可帮助您运行代码,而无需预置或管理服务器。它仅在需要时运行您的代码,并且能自动扩缩,因此您只需为使用的计算时间付费。
AWSCloud Development Kit (CDK) 是一个框架,用于在代码中定义云基础架构并通过它进行配置AWS CloudFormation。
AWS DataOps Developmen@@ t Kit (DDK)
是一个开源开发框架,可帮助你在其上构建数据工作流程和现代数据架构AWS。
代码
此模式的代码可在 GitHub AWS DataOps 开发套件 (DDK)
操作说明
任务 | 描述 | 所需技能 |
---|---|---|
克隆源代码。 | 要克隆源代码,请运行以下命令:
| DevOps 工程师 |
创建虚拟环境。 | 导航到源代码目录,然后运行以下命令创建虚拟环境:
| DevOps 工程师 |
安装依赖项。 | 要激活虚拟环境并安装依赖项,请运行以下命令:
| DevOps 工程师 |
任务 | 描述 | 所需技能 |
---|---|---|
引导环境。 |
| DevOps 工程师 |
部署数据。 | 要部署数据管线,请运行 | DevOps 工程师 |
任务 | 描述 | 所需技能 |
---|---|---|
验证堆栈状态。 |
| DevOps 工程师 |
故障排除
事务 | 解决方案 |
---|---|
如果在创建 | 确认您已为 Google Analytics(分析)创建了亚马逊 AppFlow 连接器并将其命名 有关说明,请参阅亚马逊 AppFlow 文档中的谷歌分析。 |
相关资源
AWS DataOps 开发套件 (DDK)
(GitHub) AWSDDK示例
(GitHub)
其他信息
AWSDDK数据管道由一个或多个阶段组成。在以下代码示例中,您使用 AppFlowIngestionStage
从 Google Analytics 摄取数据,使用 SqsToLambdaStage
处理数据转换,使用 AthenaSQLStage
运行 Athena 查询。
首先,创建数据转换和摄取阶段,如以下代码示例所示:
appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )
接下来,使用该DataPipeline
构造通过使用 EventBridge 规则将各个阶段 “连接” 在一起,如以下代码示例所示:
( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )
有关更多代码示例,请参阅使用亚马逊、Amazon AppFlow Athena AWS DataOps 和开发套件 GitHub 分析谷歌分析数据存储库