Build a data pipeline to ingest, transform, and analyze Google Analytics data using the AWS DataOps Development Kit - AWS Prescriptive Guidance

Build a data pipeline to ingest, transform, and analyze Google Analytics data using the AWS DataOps Development Kit

Created by Anton Kukushkin (AWS) and Rudy Puig (AWS)

Code repository: AWS DDK Examples - Analyzing Google Analytics data with Amazon AppFlow, Amazon Athena, and AWS DataOps Development Kit

Environment: PoC or pilot

Technologies: DataLakes; Analytics; DevOps; Infrastructure

Workload: Open-source

AWS services: Amazon AppFlow; Amazon Athena; AWS CDK; AWS Lambda; Amazon S3

Summary

This pattern describes how to build a data pipeline to ingest, transform, and analyze Google Analytics data by using the AWS DataOps Development Kit (DDK) and other AWS services. The AWS DDK is an open-source development framework that helps you build data workflows and modern data architecture on AWS. One of the main objectives of the AWS DDK is to save you the time and effort that's typically devoted to labor-intensive data pipeline tasks, such as orchestrating pipelines, building infrastructure, and creating the DevOps behind that infrastructure. You can offload these labor-intensive tasks to AWS DDK so that you can focus on writing code and other high-value activities.

Prerequisites and limitations

Prerequisites

Product versions

  • Python 3.7 or later

  • pip 9.0.3 or later

Architecture

Technology stack

  • Amazon AppFlow

  • Amazon Athena

  • Amazon CloudWatch

  • Amazon EventBridge

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon Simple Queue Service (Amazon SQS)

  • AWS DataOps Development Kit (DDK)

  • AWS Lambda

Target architecture

The following diagram shows the event-driven process that ingests, transforms, and analyzes Google Analytics data.

Architecutre diagram

The diagram shows the following workflow:

  1. An Amazon CloudWatch scheduled event rule invokes Amazon AppFlow.

  2. Amazon AppFlow ingests Google Analytics data into an S3 bucket.

  3. After the data is ingested by the S3 bucket, event notifications in EventBridge are generated, captured by a CloudWatch Events rule, and then put into an Amazon SQS queue.

  4. A Lambda function consumes events from the Amazon SQS queue, reads the respective S3 objects, transforms the objects to Apache Parquet format, writes the transformed objects to the S3 bucket, and then creates or updates the AWS Glue Data Catalog table definition.

  5. An Athena query runs against the table.

Tools

AWS tools

  • Amazon AppFlow is a fully-managed integration service that enables you to securely exchange data between software as a service (SaaS) applications.

  • Amazon Athena is an interactive query service that helps you analyze data directly in Amazon S3 by using standard SQL.

  • Amazon CloudWatch helps you monitor the metrics of your AWS resources and the applications you run on AWS in real time.

  • Amazon EventBridge is a serverless event bus service that helps you connect your applications with real-time data from a variety of sources. For example, AWS Lambda functions, HTTP invocation endpoints using API destinations, or event buses in other AWS accounts.

  • Amazon Simple Storage Service (Amazon S3) is a cloud-based object storage service that helps you store, protect, and retrieve any amount of data.

  • Amazon Simple Queue Service (Amazon SQS) provides a secure, durable, and available hosted queue that helps you integrate and decouple distributed software systems and components.

  • AWS Lambda is a compute service that helps you run code without needing to provision or manage servers. It runs your code only when needed and scales automatically, so you pay only for the compute time that you use.

  • AWS Cloud Development Kit (CDK) is a framework for defining cloud infrastructure in code and provisioning it through AWS CloudFormation.

  • AWS DataOps Development Kit (DDK) is an open-source development framework to help you build data workflows and modern data architecture on AWS.

Code

The code for this pattern is available in the GitHub AWS DataOps Development Kit (DDK) and Analyzing Google Analytics data with Amazon AppFlow, Amazon Athena, and AWS DataOps Development Kit repositories.

Epics

TaskDescriptionSkills required

Clone the source code.

To clone the source code, run the following command:

git clone https://github.com/aws-samples/aws-ddk-examples.git
DevOps engineer

Create a virtual environment.

Navigate to the source code directory, and then run the following command to create a virtual environment:

cd google-analytics-data-using-appflow/python && python3 -m venv .venv
DevOps engineer

Install the dependencies.

To activate the virtual environment and install the dependencies, run the following command:

source .venv/bin/activate && pip install -r requirements.txt
DevOps engineer
TaskDescriptionSkills required

Bootstrap the environment.

  1. Confirm that the AWS CLI is set up with valid credentials for your AWS account. For more information, see Using named profiles in the AWS CLI documentation.

  2. Run the cdk bootstrap --profile [AWS_PROFILE] command.

DevOps engineer

Deploy the data.

To deploy the data pipeline, run the cdk deploy --profile [AWS_PROFILE] command.

DevOps engineer
TaskDescriptionSkills required

Validate stack status.

  1. Open the AWS CloudFormation console.

  2. On the Stacks page, confirm that the status of the stack DdkAppflowAthenaStack is CREATE_COMPLETE.

DevOps engineer

Troubleshooting

IssueSolution

Deployment fails during the creation of an AWS::AppFlow::Flow resource and you receive the following error: Connector Profile with name ga-connection does not exist

Confirm that you created an Amazon AppFlow connector for Google Analytics and named it ga-connection.

For instructions, see Google Analytics in the Amazon AppFlow documentation.

Related resources

Additional information

AWS DDK data pipelines are composed of one or many stages. In the following code examples, you use AppFlowIngestionStage to ingest data from Google Analytics, SqsToLambdaStage to handle data transformation, and AthenaSQLStage to run the Athena query.

First, the data transformation and ingestion stages are created, as the following code example shows:

appflow_stage = AppFlowIngestionStage( self, id="appflow-stage", flow_name=flow.flow_name, ) sqs_lambda_stage = SqsToLambdaStage( self, id="lambda-stage", lambda_function_props={ "code": Code.from_asset("./ddk_app/lambda_handlers"), "handler": "handler.lambda_handler", "layers": [ LayerVersion.from_layer_version_arn( self, id="layer", layer_version_arn=f"arn:aws:lambda:{self.region}:336392948345:layer:AWSDataWrangler-Python39:1", ) ], "runtime": Runtime.PYTHON_3_9, }, ) # Grant lambda function S3 read & write permissions bucket.grant_read_write(sqs_lambda_stage.function) # Grant Glue database & table permissions sqs_lambda_stage.function.add_to_role_policy( self._get_glue_db_iam_policy(database_name=database.database_name) ) athena_stage = AthenaSQLStage( self, id="athena-sql", query_string=[ ( "SELECT year, month, day, device, count(user_count) as cnt " f"FROM {database.database_name}.ga_sample " "GROUP BY year, month, day, device " "ORDER BY cnt DESC " "LIMIT 10; " ) ], output_location=Location( bucket_name=bucket.bucket_name, object_key="query-results/" ), additional_role_policy_statements=[ self._get_glue_db_iam_policy(database_name=database.database_name) ], )

Next, the DataPipeline construct is used to "wire" the stages together by using EventBridge rules, as the following code example shows:

( DataPipeline(self, id="ingestion-pipeline") .add_stage( stage=appflow_stage, override_rule=Rule( self, "schedule-rule", schedule=Schedule.rate(Duration.hours(1)), targets=appflow_stage.targets, ), ) .add_stage( stage=sqs_lambda_stage, # By default, AppFlowIngestionStage stage emits an event after the flow run finishes successfully # Override rule below changes that behavior to call the the stage when data lands in the bucket instead override_rule=Rule( self, "s3-object-created-rule", event_pattern=EventPattern( source=["aws.s3"], detail={ "bucket": {"name": [bucket.bucket_name]}, "object": {"key": [{"prefix": "ga-data"}]}, }, detail_type=["Object Created"], ), targets=sqs_lambda_stage.targets, ), ) .add_stage(stage=athena_stage) )

For more code examples, see the GitHub Analyzing Google Analytics data with Amazon AppFlow, Amazon Athena, and AWS DataOps Development Kit repository.