Three AWS Glue ETL job types for converting data to Apache Parquet
Created by Adnan Alvee (AWS), Karthikeyan Ramachandran (AWS), and Nith Govindasivan (AWS)
Summary
On the Amazon Web Services (AWS) Cloud, AWS Glue is a fully managed extract, transform, and load (ETL) service. AWS Glue makes it cost-effective to categorize your data, clean it, enrich it, and move it reliably between various data stores and data streams.
This pattern provides different job types in AWS Glue and uses three different scripts to demonstrate authoring ETL jobs.
You can use AWS Glue to write ETL jobs in a Python shell environment. You can also create both batch and streaming ETL jobs by using Python (PySpark) or Scala in a managed Apache Spark environment. To get you started with authoring ETL jobs, this pattern focuses on batch ETL jobs using Python shell, PySpark, and Scala. Python shell jobs are meant for workloads requiring lesser compute power. The managed Apache Spark environment is meant for workloads requiring high compute power.
Apache Parquet is built to support efficient compression and encoding schemes. It can speed up your analytics workloads because it stores data in a columnar fashion. Converting data to Parquet can save you storage space, cost, and time in the longer run. To learn more about Parquet, see the blog post Apache Parquet: How to be a hero with the open-source columnar data format
Prerequisites and limitations
Prerequisites
AWS Identity and Access Management (IAM) role (If you don’t have a role, see the Additional information section.)
Architecture
Target technology stack
AWS Glue
Amazon Simple Storage Service (Amazon S3)
Apache Parquet
Automation and scale
AWS Glue workflows support full automation of an ETL pipeline.
You can change the number of data processing units (DPUs), or worker types, to scale horizontally and vertically.
Tools
AWS services
Amazon Simple Storage Service (Amazon S3) is a cloud-based object storage service that helps you store, protect, and retrieve any amount of data.
AWS Glue is a fully managed ETL service for categorizing, cleaning, enriching, and moving your data between various data stores and data streams.
Other tools
Apache Parquet
is an open-source column-oriented data file format designed for storage and retrieval.
Configuration
Use the following settings for configuring the compute power of AWS Glue ETL. To reduce costs, use the minimal settings when you run the workload that is provided in this pattern.
Python shell – You can use 1 DPU to utilize 16 GB of memory or 0.0625 DPU to utilize 1 GB of memory. This pattern uses 0.0625 DPU, which is the default in the AWS Glue console.
Python or Scala for Spark – If you choose the Spark-related job types in the console, AWS Glue by default uses 10 workers and the G.1X worker type. This pattern uses two workers, which is the minimum number allowed, with the standard worker type, which is sufficient and cost effective.
The following table displays the different AWS Glue worker types for the Apache Spark environment. Because a Python shell job does not use the Apache Spark environment to run Python, it is not included in the table.
Standard | G.1X | G.2X | |
---|---|---|---|
vCPU | 4 | 4 | 8 |
Memory | 16 GB | 16 GB | 32 GB |
Disk space | 50 GB | 64 GB | 128 GB |
Executor per worker | 2 | 1 | 1 |
Code
For the code that is used in this pattern, including the IAM role and parameter configuration, see the Additional information section.
Epics
Task | Description | Skills required |
---|---|---|
Upload the data into a new or existing S3 bucket. | Create or use an existing S3 bucket in your account. Upload the sample_data.csv file from the Attachments section, and note the S3 bucket and prefix location. | General AWS |
Task | Description | Skills required |
---|---|---|
Create the AWS Glue job. | Under the ETL section of the AWS Glue console, add an AWS Glue job. Select the appropriate job type, AWS Glue version, and the corresponding DPU/Worker type and number of workers. For details, see the Configuration section. | Developer, cloud or data |
Change the input and output locations. | Copy the code corresponding to your AWS Glue job, and change the input and output location that you noted in the Upload the data epic. | Developer, cloud or data |
Configure the parameters. | You can use the snippets provided in the Additional information section to set parameters for your ETL job. AWS Glue uses four argument names internally:
The You must add | Developer, cloud or data |
Run the ETL job. | Run your job and check the output. Note how much space was reduced from the original file. | Developer, cloud or data |
Related resources
References
Tutorials and videos
Additional information
IAM role
When you create the AWS Glue jobs, you can use either an existing IAM role that has the permissions shown in the following code snippet or a new role.
To create a new role, use the following YAML code.
# (c) 2022 Amazon Web Services, Inc. or its affiliates. All Rights Reserved. This AWS Content is provided subject to the terms of the AWS Customer # Agreement available at https://aws.amazon.com/agreement/ or other written agreement between Customer and Amazon Web Services, Inc. AWSTemplateFormatVersion: "2010-09-09" Description: This template will setup IAM role for AWS Glue service. Resources: rGlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "glue.amazonaws.com" Action: - "sts:AssumeRole" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole Policies: - PolicyName: !Sub "${AWS::StackName}-s3-limited-read-write-inline-policy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "s3:PutObject" - "s3:GetObject" Resource: "arn:aws:s3:::*/*" Tags: - Key : "Name" Value : !Sub "${AWS::StackName}" Outputs: oGlueRoleName: Description: AWS Glue IAM role Value: Ref: rGlueRole Export: Name: !Join [ ":", [ !Ref "AWS::StackName", rGlueRole ] ]
AWS Glue Python Shell
The Python code uses the Pandas and PyArrow libraries to convert data to Parquet. The Pandas library is already available. The PyArrow library is downloaded when you run the pattern, because it is a one-time run. You can use wheel files to convert PyArrow to a library and provide the file as a library package. For more information about packaging wheel files, see Providing your own Python library.
AWS Glue Python shell parameters
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["input_loc", "output_loc"])
AWS Glue Python shell code
from io import BytesIO import pandas as pd import boto3 import os import io import site from importlib import reload from setuptools.command import easy_install install_path = os.environ['GLUE_INSTALLATION'] easy_install.main( ["--install-dir", install_path, "pyarrow"] ) reload(site) import pyarrow input_loc = "s3://bucket-name/prefix/sample_data.csv" output_loc = "s3://bucket-name/prefix/" input_bucket = input_loc.split('/', 1)[0] object_key = input_loc.split('/', 1)[1] output_loc_bucket = output_loc.split('/', 1)[0] output_loc_prefix = output_loc.split('/', 1)[1] s3 = boto3.client('s3') obj = s3.get_object(Bucket=input_bucket, Key=object_key) df = pd.read_csv(io.BytesIO(obj['Body'].read())) parquet_buffer = BytesIO() s3_resource = boto3.resource('s3') df.to_parquet(parquet_buffer, index=False) s3_resource.Object(output_loc_bucket, output_loc_prefix + 'data' + '.parquet').put(Body=parquet_buffer.getvalue())
AWS Glue Spark job with Python
To use an AWS Glue Spark job type with Python, choose Spark as the job type. Choose Spark 3.1, Python 3 with improved job startup time (Glue Version 3.0) as the AWS Glue version.
AWS Glue Python parameters
from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME", "input_loc", "output_loc"])
AWS Glue Spark job with Python code
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import * from awsglue.dynamicframe import DynamicFrame from awsglue.utils import getResolvedOptions from awsglue.job import Job sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) input_loc = "s3://bucket-name/prefix/sample_data.csv" output_loc = "s3://bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options(\ connection_type = "s3", \ connection_options = { "paths": [input_loc]}, \ format = "csv", format_options={ "withHeader": True, "separator": "," }) outputDF = glueContext.write_dynamic_frame.from_options(\ frame = inputDyf, \ connection_type = "s3", \ connection_options = {"path": output_loc \ }, format = "parquet")
For a large number of compressed big files (for example, 1,000 files that are each about 3 MB), use the compressionType
parameter with the recurse
parameter to read all the files that are available within the prefix, as shown in the following code.
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
For a large number of compressed small files (for example, 1,000 files that are each about 133 KB), use the groupFiles
parameter, along with both the compressionType
and the recurse
parameters. The groupFiles
parameter groups small files into multiple big files, and the groupSize
parameter controls the grouping to the specified size in bytes (for example, 1 MB). The following code snippet provides an example of using these parameters within the code.
input_loc = "bucket-name/prefix/" output_loc = "bucket-name/prefix/" inputDyf = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": [input_loc], "compressionType":"gzip","recurse" :"True", "groupFiles" :"inPartition", "groupSize" :"1048576", }, format = "csv", format_options={"withHeader": True,"separator": ","} )
Without any change in the worker nodes, these settings enable the AWS Glue job to read multiple files (large or small, with or without compression) and write them to the target in Parquet format.
AWS Glue Spark job with Scala
To use an AWS Glue Spark job type with Scala, choose Spark as the job type and Language as Scala. Choose Spark 3.1, Scala 2 with improved job startup time (Glue Version 3.0) as the AWS Glue version. To save on storage space, the following AWS Glue with Scala sample also uses the applyMapping
feature to convert data types.
AWS Glue Scala parameters
import com.amazonaws.services.glue.util.GlueArgParser val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME", "inputLoc", "outputLoc").toArray)
AWS Glue Spark job with Scala code
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.MappingSpec import com.amazonaws.services.glue.DynamicFrame import com.amazonaws.services.glue.errors.CallSite import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueScalaApp { def main(sysArgs: Array[String]) { @transient val spark: SparkContext = SparkContext.getOrCreate() val glueContext: GlueContext = new GlueContext(spark) val inputLoc = "s3://bucket-name/prefix/sample_data.csv" val outputLoc = "s3://bucket-name/prefix/" val readCSV = glueContext.getSource("csv", JsonOptions(Map("paths" -> Set(inputLoc)))).getDynamicFrame() val applyMapping = readCSV.applyMapping(mappings = Seq(("_c0", "string", "date", "string"), ("_c1", "string", "sales", "long"), ("_c2", "string", "profit", "double")), caseSensitive = false) val formatPartition = applyMapping.toDF().coalesce(1) val dynamicFrame = DynamicFrame(formatPartition, glueContext) val dataSink = glueContext.getSinkWithFormat( connectionType = "s3", options = JsonOptions(Map("path" -> outputLoc )), transformationContext = "dataSink", format = "parquet").writeDynamicFrame(dynamicFrame) } }
Attachments
To access additional content that is associated with this document, unzip the following file: attachment.zip