Development AWS DataOps Kit を使用して Google Analytics データを取り込み、変換、分析するためのデータパイプラインを構築する - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Development AWS DataOps Kit を使用して Google Analytics データを取り込み、変換、分析するためのデータパイプラインを構築する

作成者: Anton Kukushkin (AWS) と Rudy Puig (AWS)

コードリポジトリ: AWS DDK 例 - Amazon 、Amazon Athena AppFlow、および AWS DataOps Development Kit を使用した Google Analytics データの分析

環境:PoC またはパイロット

テクノロジー: DataLakes、分析 DevOps、インフラストラクチャ

ワークロード: オープンソース

AWS サービス: Amazon AppFlow、Amazon Athena AWS 、CDK、AWSLambda、Amazon S3

[概要]

このパターンでは、AWS DataOps Development Kit (DDK) やその他の AWSサービスを使用して Google Analytics データを取り込み、変換、分析するためのデータパイプラインを構築する方法について説明します。AWS DDK は、 でのデータワークフローと最新のデータアーキテクチャの構築に役立つオープンソースの開発フレームワークですAWS。の主な目的の 1 AWSDDKつは、パイプラインのオーケストレーション、インフラストラクチャの構築、そのインフラストラクチャの DevOps 背後にある の作成など、通常、労力のかかるデータパイプラインタスクに費やす時間と労力を節約することです。これらの労働集約的なタスクを にオフロードAWSDDKして、コードやその他の価値の高いアクティビティの作成に集中できます。

前提条件と制限

前提条件

  • アクティブなAWSアカウント

  • Google Analytics 用の Amazon AppFlow コネクタ、設定済み

  • Pythonpip (Python のパッケージマネージャ)

  • インストールおよび設定済みの Git

  • AWS コマンドラインインターフェイス (AWSCLI)、インストールおよび設定済み

  • AWS クラウド開発キット (AWSCDK)、インストール済み

製品バージョン

  • Python 3.7 以降

  • pip 9.0.3 以降

アーキテクチャ

テクノロジースタック

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon Simple Queue Service (Amazon SQS)

  • AWS DataOps 開発キット (DDK)

  • AWS Lambda

ターゲット アーキテクチャ

次の図は、Google アナリティクスのデータを取り込み、変換、分析するイベント駆動型のプロセスを示しています。

アーキテクチャ図

この図表は、次のワークフローを示しています:

  1. Amazon スケジュール CloudWatch されたイベントルールは Amazon を呼び出します AppFlow。

  2. Amazon は Google Analytics データを S3 バケットに AppFlow 取り込みます。

  3. S3 バケットによってデータが取り込まれると、 のイベント通知 EventBridge が生成され、 CloudWatch イベントルールによってキャプチャされ、Amazon SQSキューに入れられます。

  4. Lambda 関数は、Amazon SQSキューからのイベントを消費し、それぞれの S3 オブジェクトを読み取り、オブジェクトを Apache Parquet 形式に変換し、変換されたオブジェクトを S3 バケットに書き込み、Glue Data Catalog AWS テーブル定義を作成または更新します。

  5. Athena クエリがテーブルに対して実行されます。

ツール

AWS ツール

  • Amazon AppFlow は、Software as a Service (SaaS) アプリケーション間でデータを安全に交換できるフルマネージド統合サービスです。

  • Amazon Athena は、標準 を使用して Amazon S3 内のデータを直接分析するのに役立つインタラクティブなクエリサービスですSQL。

  • Amazon CloudWatch は、 AWSリソースと で実行しているアプリケーションのメトリクスをAWSリアルタイムでモニタリングするのに役立ちます。

  • Amazon EventBridge は、アプリケーションをさまざまなソースからのリアルタイムデータに接続するのに役立つサーバーレスイベントバスサービスです。例えば、AWSLambda 関数、送信API先を使用したHTTP呼び出しエンドポイント、他のAWSアカウントのイベントバスなどです。

  • Amazon Simple Storage Service (Amazon S3) は、どのようなデータ量であっても、データを保存、保護、取得することを支援するクラウドベースのオブジェクトストレージサービスです。

  • Amazon Simple Queue Service (Amazon SQS) は、分散ソフトウェアシステムとコンポーネントの統合と分離に役立つ、安全で耐久性があり、利用可能なホストキューを提供します。

  • AWS Lambda は、サーバーのプロビジョニングや管理を必要とせずにコードを実行するのに役立つコンピューティングサービスです。必要に応じてコードを実行し、自動的にスケーリングするため、課金は実際に使用したコンピューティング時間に対してのみ発生します。

  • AWS Cloud Development Kit (CDK) は、コードでクラウドインフラストラクチャを定義し、 を通じてプロビジョニングするためのフレームワークですAWS CloudFormation。

  • AWS DataOps Development Kit (DDK) は、 でのデータワークフローと最新のデータアーキテクチャの構築に役立つオープンソースの開発フレームワークですAWS。

コード

このパターンのコードは、 GitHub AWS DataOps 「Development Kit (DDK)」および「Amazon AppFlow、Amazon Athena 、AWS DataOps および Development Kit リポジトリを使用した Google Analytics データの分析」で入手できます。

エピック

タスク説明必要なスキル

ソースコードを複製します。

ソースコードのクローンを作成するには、次のコマンドを実行します。

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps エンジニア

仮想環境を作成します。

ソースコードディレクトリに移動し、以下のコマンドを実行して仮想環境を作成します。

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps エンジニア

SDK の依存関係をインストールします。

次のコマンドを実行して、仮想環境を有効にし、依存関係をインストールします。

source .venv/bin/activate && pip install -r requirements.txt
DevOps エンジニア
タスク説明必要なスキル

環境を起動します。

  1. がAWSアカウントの有効な認証情報で設定AWSCLIされていることを確認します。詳細については、 AWSCLIドキュメントの「名前付きプロファイルの使用」を参照してください。

  2. cdk bootstrap --profile [AWS_PROFILE] コマンドを実行します。

DevOps エンジニア

データをデプロイします。

データパイプラインをデプロイするには、cdk deploy --profile [AWS_PROFILE] コマンドを実行します。

DevOps エンジニア
タスク説明必要なスキル

スタックのステータスを検証します。

  1. AWS CloudFormation コンソール を開きます。

  2. Stacks ページで、DdkAppflowAthenaStack スタックのステータスが CREATE_COMPLETE であることを確認します。

DevOps エンジニア

トラブルシューティング

問題ソリューション

AWS::AppFlow::Flow リソースの作成中にデプロイが失敗し、次のエラーが表示されます。Connector Profile with name ga-connection does not exist

Google Analytics 用の Amazon AppFlow コネクタを作成し、 と名付けたことを確認しますga-connection

手順については、Amazon AppFlow ドキュメントの「Google Analytics」を参照してください。

関連リソース

追加情報

AWS DDK データパイプラインは 1 つ以上のステージで構成されます。次のコード例では、AppFlowIngestionStage を使用して Google Analytics からデータを取り込み、SqsToLambdaStage を使用してデータ変換を処理し、AthenaSQLStage を使用して Athena クエリを実行します。

まず、次のコード例に示すように、データ変換ステージと取り込みステージを作成します。

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

次に、次のコード例に示すように、 DataPipelineコンストラクトを使用して、 EventBridge ルールを使用してステージを「ワイヤ」します。

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

その他のコード例については、 GitHub 「Amazon 、Amazon Athena AppFlow、および AWS DataOps Development Kit を使用した Google Analytics データの分析」リポジトリを参照してください。