本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
建置資料管道,以使用AWS DataOps 開發套件擷取、轉換和分析 Google Analytics 資料
由 Anton Kukushkin (AWS) 和 Rudy Puig (AWS) 建立
程式碼儲存庫:AWSDDK範例 - 使用 Amazon AppFlow、Amazon Athena 和 AWS DataOps 開發套件分析 Google Analytics 資料 | 環境:PoC 或試行 | 技術: DataLakes;分析; DevOps;基礎設施 |
工作負載:開放原始碼 | AWS 服務:Amazon AppFlow;Amazon Athena ;AWSCDK;AWSLambda;Amazon S3 |
Summary
此模式說明如何使用AWS DataOps 開發套件 (DDK) 和其他 AWS服務建置資料管道來擷取、轉換和分析 Google Analytics 資料。AWS DDK 是開放原始碼開發架構,可協助您在 上建置資料工作流程和現代資料架構AWS。的主要目標之一AWSDDK是為您節省通常用於勞力密集型資料管道任務的時間和精力,例如協調管道、建置基礎設施,以及在該基礎設施 DevOps 背後建立 。您可以將這些耗力密集型任務卸載至 ,AWSDDK以便專注於編寫程式碼和其他高價值活動。
先決條件和限制
先決條件
產品版本
Python 3.7 或更高版本
pip 9.0.3 或更新版本
架構
技術堆疊
Amazon AppFlow
Amazon Athena
Amazon CloudWatch
Amazon EventBridge
Amazon Simple Storage Service (Amazon S3)
Amazon Simple Queue Service (Amazon SQS)
AWS DataOps 開發套件 (DDK)
AWS Lambda
目標架構
下圖顯示擷取、轉換和分析 Google Analytics 資料的事件驅動程序。
該圖顯示以下工作流程:
Amazon CloudWatch 排程事件規則會叫用 Amazon AppFlow。
Amazon 會將 Google Analytics 資料 AppFlow 擷取到 S3 儲存貯體。
在 S3 儲存貯體擷取資料之後, 中的事件通知 EventBridge 會產生、由 CloudWatch 事件規則擷取,然後放入 Amazon SQS佇列。
Lambda 函數會耗用來自 Amazon SQS佇列的事件、讀取個別的 S3 物件、將物件轉換為 Apache Parquet 格式、將轉換的物件寫入 S3 儲存貯體,然後建立或更新 AWS Glue Data Catalog 資料表定義。
Athena 查詢會針對資料表執行。
工具
AWS 工具
Amazon AppFlow 是一種完全受管的整合服務,可讓您在軟體即服務 (SaaS 應用程式之間安全地交換資料。
Amazon Athena 是一種互動式查詢服務,可協助您使用標準 直接分析 Amazon S3 中的資料SQL。
Amazon CloudWatch 可協助您AWS即時監控AWS資源的指標,以及您在 上執行的應用程式。
Amazon EventBridge 是一種無伺服器事件匯流排服務,可協助您將應用程式與各種來源的即時資料連線。例如,AWSLambda 函數、使用API目的地HTTP叫用端點,或其他AWS帳戶中的事件匯流排。
Amazon Simple Storage Service (Amazon S3) 是一種雲端型物件儲存服務,可協助您儲存、保護和擷取任何數量的資料。
Amazon Simple Queue Service (Amazon SQS) 提供安全、耐用且可用的託管佇列,可協助您整合和解耦分散式軟體系統和元件。
AWS Lambda 是一種運算服務,可協助您執行程式碼,而不需要佈建或管理伺服器。它只會在需要時執行程式碼並自動擴展,因此您只需支付您使用的運算時間。
AWS 雲端開發套件 (CDK) 是用於在程式碼中定義雲端基礎設施並透過 佈建雲端基礎設施的架構AWS CloudFormation。
AWS DataOps 開發套件 (DDK)
是一種開放原始碼開發架構,可協助您在 上建置資料工作流程和現代資料架構AWS。
Code
此模式的程式碼可在 GitHub AWS DataOps 開發套件 (DDK)
史詩
任務 | 描述 | 所需的技能 |
---|---|---|
複製原始程式碼。 | 若要複製原始程式碼,請執行下列命令:
| DevOps 工程師 |
建立虛擬環境。 | 導覽至原始程式碼目錄,然後執行下列命令來建立虛擬環境:
| DevOps 工程師 |
安裝相依性。 | 若要啟用虛擬環境並安裝相依性,請執行下列命令:
| DevOps 工程師 |
任務 | 描述 | 所需的技能 |
---|---|---|
啟動環境。 |
| DevOps 工程師 |
部署資料。 | 若要部署資料管道,請執行 | DevOps 工程師 |
任務 | 描述 | 所需的技能 |
---|---|---|
驗證堆疊狀態。 |
| DevOps 工程師 |
故障診斷
問題 | 解決方案 |
---|---|
建立 | 確認您已建立 Google Analytics 的 Amazon AppFlow 連接器並命名為 如需指示,請參閱 Amazon AppFlow 文件中的 Google Analytics。 |
相關資源
AWS DataOps 開發套件 (DDK)
(GitHub) AWS DDK 範例
(GitHub)
其他資訊
AWS DDK 資料管道由一或多個階段組成。在下列程式碼範例中,您會使用 從 Google Analytics AppFlowIngestionStage
擷取資料、SqsToLambdaStage
處理資料轉換,以及AthenaSQLStage
執行 Athena 查詢。
首先,會建立資料轉換和擷取階段,如下列程式碼範例所示:
appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )
接下來,DataPipeline
建構會使用 EventBridge 規則將階段串連在一起,如下列程式碼範例所示:
( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )
如需更多程式碼範例,請參閱 GitHub 使用 Amazon AppFlow、Amazon Athena 和AWS DataOps 開發套件儲存庫分析 Google Analytics 資料