Build a data pipeline to ingest, transform, and analyze Google Analytics data using the AWS DataOps Development Kit
Created by Anton Kukushkin (AWS) and Rudy Puig (AWS)
Code repository: AWS DDK Examples - Analyzing Google Analytics data with Amazon AppFlow, Amazon Athena, and AWS DataOps Development Kit | Environment: PoC or pilot | Technologies: DataLakes; Analytics; DevOps; Infrastructure |
Workload: Open-source | AWS services: Amazon AppFlow; Amazon Athena; AWS CDK; AWS Lambda; Amazon S3 |
Summary
This pattern describes how to build a data pipeline to ingest, transform, and analyze Google Analytics data by using the AWS DataOps Development Kit (DDK) and other AWS services. The AWS DDK is an open-source development framework that helps you build data workflows and modern data architecture on AWS. One of the main objectives of the AWS DDK is to save you the time and effort that's typically devoted to labor-intensive data pipeline tasks, such as orchestrating pipelines, building infrastructure, and creating the DevOps behind that infrastructure. You can offload these labor-intensive tasks to AWS DDK so that you can focus on writing code and other high-value activities.
Prerequisites and limitations
Prerequisites
An active AWS account
An Amazon AppFlow connector for Google Analytics, configured
Git, installed and configured
AWS Command Line Interface (AWS CLI), installed and configured
AWS Cloud Development Kit (AWS CDK), installed
Product versions
Python 3.7 or later
pip 9.0.3 or later
Architecture
Technology stack
Amazon AppFlow
Amazon Athena
Amazon CloudWatch
Amazon EventBridge
Amazon Simple Storage Service (Amazon S3)
Amazon Simple Queue Service (Amazon SQS)
AWS DataOps Development Kit (DDK)
AWS Lambda
Target architecture
The following diagram shows the event-driven process that ingests, transforms, and analyzes Google Analytics data.
The diagram shows the following workflow:
An Amazon CloudWatch scheduled event rule invokes Amazon AppFlow.
Amazon AppFlow ingests Google Analytics data into an S3 bucket.
After the data is ingested by the S3 bucket, event notifications in EventBridge are generated, captured by a CloudWatch Events rule, and then put into an Amazon SQS queue.
A Lambda function consumes events from the Amazon SQS queue, reads the respective S3 objects, transforms the objects to Apache Parquet format, writes the transformed objects to the S3 bucket, and then creates or updates the AWS Glue Data Catalog table definition.
An Athena query runs against the table.
Tools
AWS tools
Amazon AppFlow is a fully-managed integration service that enables you to securely exchange data between software as a service (SaaS) applications.
Amazon Athena is an interactive query service that helps you analyze data directly in Amazon S3 by using standard SQL.
Amazon CloudWatch helps you monitor the metrics of your AWS resources and the applications you run on AWS in real time.
Amazon EventBridge is a serverless event bus service that helps you connect your applications with real-time data from a variety of sources. For example, AWS Lambda functions, HTTP invocation endpoints using API destinations, or event buses in other AWS accounts.
Amazon Simple Storage Service (Amazon S3) is a cloud-based object storage service that helps you store, protect, and retrieve any amount of data.
Amazon Simple Queue Service (Amazon SQS) provides a secure, durable, and available hosted queue that helps you integrate and decouple distributed software systems and components.
AWS Lambda is a compute service that helps you run code without needing to provision or manage servers. It runs your code only when needed and scales automatically, so you pay only for the compute time that you use.
AWS Cloud Development Kit (CDK) is a framework for defining cloud infrastructure in code and provisioning it through AWS CloudFormation.
AWS DataOps Development Kit (DDK)
is an open-source development framework to help you build data workflows and modern data architecture on AWS.
Code
The code for this pattern is available in the GitHub AWS DataOps Development Kit (DDK)
Epics
Task | Description | Skills required |
---|---|---|
Clone the source code. | To clone the source code, run the following command:
| DevOps engineer |
Create a virtual environment. | Navigate to the source code directory, and then run the following command to create a virtual environment:
| DevOps engineer |
Install the dependencies. | To activate the virtual environment and install the dependencies, run the following command:
| DevOps engineer |
Task | Description | Skills required |
---|---|---|
Bootstrap the environment. |
| DevOps engineer |
Deploy the data. | To deploy the data pipeline, run the | DevOps engineer |
Task | Description | Skills required |
---|---|---|
Validate stack status. |
| DevOps engineer |
Troubleshooting
Issue | Solution |
---|---|
Deployment fails during the creation of an | Confirm that you created an Amazon AppFlow connector for Google Analytics and named it For instructions, see Google Analytics in the Amazon AppFlow documentation. |
Related resources
AWS DataOps Development Kit (DDK)
(GitHub) AWS DDK Examples
(GitHub)
Additional information
AWS DDK data pipelines are composed of one or many stages. In the following code examples, you use AppFlowIngestionStage
to ingest data from Google Analytics, SqsToLambdaStage
to handle data transformation, and AthenaSQLStage
to run the Athena query.
First, the data transformation and ingestion stages are created, as the following code example shows:
appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )
Next, the DataPipeline
construct is used to "wire" the stages together by using EventBridge rules, as the following code example shows:
( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )
For more code examples, see the GitHub Analyzing Google Analytics data with Amazon AppFlow, Amazon Athena, and AWS DataOps Development Kit