建置資料管道,以使用AWS DataOps 開發套件擷取、轉換和分析 Google Analytics 資料 - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

建置資料管道,以使用AWS DataOps 開發套件擷取、轉換和分析 Google Analytics 資料

由 Anton Kukushkin (AWS) 和 Rudy Puig (AWS) 建立

程式碼儲存庫:AWSDDK範例 - 使用 Amazon AppFlow、Amazon Athena 和 AWS DataOps 開發套件分析 Google Analytics 資料

環境:PoC 或試行

技術: DataLakes;分析; DevOps;基礎設施

工作負載:開放原始碼

AWS 服務:Amazon AppFlow;Amazon Athena ;AWSCDK;AWSLambda;Amazon S3

Summary

此模式說明如何使用AWS DataOps 開發套件 (DDK) 和其他 AWS服務建置資料管道來擷取、轉換和分析 Google Analytics 資料。AWS DDK 是開放原始碼開發架構,可協助您在 上建置資料工作流程和現代資料架構AWS。的主要目標之一AWSDDK是為您節省通常用於勞力密集型資料管道任務的時間和精力,例如協調管道、建置基礎設施,以及在該基礎設施 DevOps 背後建立 。您可以將這些耗力密集型任務卸載至 ,AWSDDK以便專注於編寫程式碼和其他高價值活動。

先決條件和限制

先決條件

  • 作用中AWS帳戶

  • 設定 Google Analytics 的 Amazon AppFlow 連接器

  • Pythonpip (Python 的套件管理員)

  • Git、已安裝和設定

  • AWS 命令列介面 (AWS CLI),已安裝設定

  • AWS 雲端開發套件 (AWS CDK),已安裝

產品版本

  • Python 3.7 或更高版本

  • pip 9.0.3 或更新版本

架構

技術堆疊

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon Simple Queue Service (Amazon SQS)

  • AWS DataOps 開發套件 (DDK)

  • AWS Lambda

目標架構

下圖顯示擷取、轉換和分析 Google Analytics 資料的事件驅動程序。

Architecutre 圖表

該圖顯示以下工作流程:

  1. Amazon CloudWatch 排程事件規則會叫用 Amazon AppFlow。

  2. Amazon 會將 Google Analytics 資料 AppFlow 擷取到 S3 儲存貯體。

  3. 在 S3 儲存貯體擷取資料之後, 中的事件通知 EventBridge 會產生、由 CloudWatch 事件規則擷取,然後放入 Amazon SQS佇列。

  4. Lambda 函數會耗用來自 Amazon SQS佇列的事件、讀取個別的 S3 物件、將物件轉換為 Apache Parquet 格式、將轉換的物件寫入 S3 儲存貯體,然後建立或更新 AWS Glue Data Catalog 資料表定義。

  5. Athena 查詢會針對資料表執行。

工具

AWS 工具

  • Amazon AppFlow 是一種完全受管的整合服務,可讓您在軟體即服務 (SaaS 應用程式之間安全地交換資料。

  • Amazon Athena 是一種互動式查詢服務,可協助您使用標準 直接分析 Amazon S3 中的資料SQL。

  • Amazon CloudWatch 可協助您AWS即時監控AWS資源的指標,以及您在 上執行的應用程式。

  • Amazon EventBridge 是一種無伺服器事件匯流排服務,可協助您將應用程式與各種來源的即時資料連線。例如,AWSLambda 函數、使用API目的地HTTP叫用端點,或其他AWS帳戶中的事件匯流排。

  • Amazon Simple Storage Service (Amazon S3) 是一種雲端型物件儲存服務,可協助您儲存、保護和擷取任何數量的資料。

  • Amazon Simple Queue Service (Amazon SQS) 提供安全、耐用且可用的託管佇列,可協助您整合和解耦分散式軟體系統和元件。

  • AWS Lambda 是一種運算服務,可協助您執行程式碼,而不需要佈建或管理伺服器。它只會在需要時執行程式碼並自動擴展,因此您只需支付您使用的運算時間。

  • AWS 雲端開發套件 (CDK) 是用於在程式碼中定義雲端基礎設施並透過 佈建雲端基礎設施的架構AWS CloudFormation。

  • AWS DataOps 開發套件 (DDK) 是一種開放原始碼開發架構,可協助您在 上建置資料工作流程和現代資料架構AWS。

Code

此模式的程式碼可在 GitHub AWS DataOps 開發套件 (DDK) 和使用 Amazon AppFlow、Amazon Athena 和AWS DataOps 開發套件儲存庫分析 Google Analytics 資料中找到。

史詩

任務描述所需的技能

複製原始程式碼。

若要複製原始程式碼,請執行下列命令:

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps 工程師

建立虛擬環境。

導覽至原始程式碼目錄,然後執行下列命令來建立虛擬環境:

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps 工程師

安裝相依性。

若要啟用虛擬環境並安裝相依性,請執行下列命令:

source .venv/bin/activate && pip install -r requirements.txt
DevOps 工程師
任務描述所需的技能

啟動環境。

  1. 確認 AWS CLI 已使用AWS您的帳戶的有效憑證設定 。如需詳細資訊,請參閱 文件中的使用具名設定檔AWSCLI。

  2. 執行 cdk bootstrap --profile [AWS_PROFILE] 命令。

DevOps 工程師

部署資料。

若要部署資料管道,請執行 cdk deploy --profile [AWS_PROFILE]命令。

DevOps 工程師
任務描述所需的技能

驗證堆疊狀態。

  1. 開啟AWS CloudFormation 主控台

  2. 堆疊頁面上,確認堆疊的狀態DdkAppflowAthenaStackCREATE_COMPLETE

DevOps 工程師

故障診斷

問題解決方案

建立AWS::AppFlow::Flow資源期間部署失敗,且您會收到下列錯誤: Connector Profile with name ga-connection does not exist

確認您已建立 Google Analytics 的 Amazon AppFlow 連接器並命名為 ga-connection

如需指示,請參閱 Amazon AppFlow 文件中的 Google Analytics

相關資源

其他資訊

AWS DDK 資料管道由一或多個階段組成。在下列程式碼範例中,您會使用 從 Google Analytics AppFlowIngestionStage 擷取資料、SqsToLambdaStage處理資料轉換,以及AthenaSQLStage執行 Athena 查詢。

首先,會建立資料轉換和擷取階段,如下列程式碼範例所示:

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

接下來,DataPipeline建構會使用 EventBridge 規則將階段串連在一起,如下列程式碼範例所示:

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

如需更多程式碼範例,請參閱 GitHub 使用 Amazon AppFlow、Amazon Athena 和AWS DataOps 開發套件儲存庫分析 Google Analytics 資料