Building a reliable data pipeline
The Well-Architected Reliability pillar encompasses the ability of a workload to perform its intended function correctly and consistently when it’s expected to. This includes the ability to operate and test the workload through its total lifecycle. This section goes in depth on best practice guidance for implementing a reliable data pipeline on AWS Glue.
Error handling
Error handling is the response and recovery mechanism from errors in an application. Application developers should follow standard error handling practices according to the language (Scala/Python) they are using to code their AWS Glue jobs.
This AWS Glue developer guide lists some of the common exceptions – causes and solutions that can be used to resolve AWS Glue-related errors.
When setting up AWS Glue jobs, it is recommended that the application team use Amazon CloudWatch
There are options to set standard filters into these logs so that standard Spark and Hadoop-related log messages are pruned out from the application log. Apart from filters, there is also an option to use a custom script logger to log application-specific messages. For more details on how to set this up, refer to Enabling Continuous Logging for AWS Glue Jobs. AWS Glue jobs can also be configured to send the Spark UI logs to an Amazon S3 bucket, that can be visualized on the Spark UI. Refer to Monitoring Jobs Using the Apache Spark Web UI for instructions on how to monitor AWS Glue jobs and configure the Spark UI.
AWS Glue also provides a feature called job run insights. This feature simplifies job debugging and optimization for your AWS Glue jobs and, provides details on your job such as line number of errors in the application, last run Spark action (before failure), root cause of failure and recommended action, and so on. The feature also allows you to supply a custom rule file to customize the error messages and recommendations.
The AWS Glue job run insights feature is enabled by default when
building jobs using AWS Glue Studio. Alternatively, it can be
enabled setting the --enable-job-insights
flag to true. Job run
insights works with AWS Glue jobs version 2.0 and above.
Enforcing schema
Sometimes the format of the incoming files or dataset may have variations from the existing data schema, and may result in the failure of downstream jobs and workflows. Applying schema can help you conform the data to the schema using the schema validation mechanism, as explained next.
In AWS Glue, DynamicFrame
infers the schema of datasets by
default. You can use the with_frame_schema()
to supply schema to
DynamicFrame
. This is a performance optimization that excepts
DynamicFrame
from having to do any schema inference.
However, in special cases where you need to enforce a schema or do
some casting or mappings such as reading numeric columns as
string, the recommended approach is to use the withSchema
format
option.
Consider a case where one of the columns in your dataset is numeric in nature. For example, zip codes that may have a leading zero. The leading zero gets dropped when read as a number. To preserve the leading zero, you can force the column to string.
Following is an example code snippet that shows how to pass the
withSchema
format option.
-
Create the schema that matches your data.
from pyspark.sql.types import * schema = StructType([ Field("first_name", StringType()) ,Field("last_name", StringType()) ,Field("email", StringType()) ,Field("code", StringType()), ])
-
Pass the JSON value of the previous schema to
DynamicFrame
. ThewithSchema
accepts a JSON string equivalent of the schema.read_datasource_s3_with_schema = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": input_datasource_s3}, format="xml", format_options= {"rowTag":"record","withSchema":json.dumps(schema.jsonValue())}, transformation_ctx ="read_datasource_s3_with_schema")
In the previous step, we have passed the JSON string equivalent of
the schema using json.dumps(schema.jsonValue())
. Alternatively,
when creating the schema, you can always create the JSON string
equivalent.
To test this, we can use the following XML data set:
<?xml version='1.0' encoding='UTF-8'?> <dataset> <record> <first_name>Clayson</first_name> <last_name>Stollsteiner</last_name> <email>cstollsteiner@example.com</email> <code>1</code> </record> <record> <first_name>Arnoldo</first_name> <last_name>Presdie</last_name> <email>apresdie@example.com</email> <code>2</code> </record> <record> <first_name>Allie</first_name> <last_name>Asling</last_name> <email>aasling@example.com</email> <code>3</code> </record> </dataset>
Alternatively, you can supply the JSON string of schema, as follows:
jsonStringSchema="{ \"dataType\": \"struct\", \"fields\": [{ "name\": \"first_name\", \"container\": { \"dataType\": \"string\" } }, { \"name\": \"last_name\", \"container\": { \"dataType\": \"string\" } }, { \"name\": \"email\", \"container\": { \"dataType\": \"string\" } }, { \"name\": \"code\", \"container\": { \"dataType\": \"string\" } }] }" read_datasource_s3_with_schema = glueContext.create_dynamic_frame_from_options( connection_type = "s3", connection_options = {"paths": input_datasource_s3}, format="xml", format_options= {"rowTag":"record","withSchema":jsonStringSchema}, transformation_ctx ="read_datasource_s3_with_schema")
Sharing state using AWS Glue workflow properties
When you are building complex data pipelines where one job depends on another job, it’s important to share the state information between different AWS Glue jobs. This section describes how to share state between chained jobs in an AWS Glue workflow.
There can be several reasons to share states between AWS Glue jobs in a workflow. A simple example is a job that needs a computed value from a predecessor job. This can be achieved in an AWS Glue workflow.
Workflows in AWS Glue support key-value pairs that are called run properties. When creating a workflow, you can create these workflow properties and initialize them with default values. As the job runs, you can overwrite these default values with computed values that other jobs can read.
Creating workflow properties
This section assumes that you are already familiar with the creation of an AWS Glue workflow. If you are not, refer to Creating and Building Out a Workflow Manually in AWS Glue.
Setting the properties using the AWS Glue console
If you are creating a workflow via the AWS Glue console, scroll down to the Default run properties section, choose Add property, and add your properties and default values in the prompted key value fields.
Setting the properties using the AWS Command Line Interface (AWS CLI)
If you are creating the workflow using the
AWS CLI--default-run-properties
.
The flag accepts a map of key values that act as the default job run properties. You can use the shorthand syntax or JSON as input:
--default-run-properties {"key1":"value1","key2":"value2"} or --default-run-properties key1=value1,key2=value2
For additional details on the AWS CLI for AWS Glue workflow creation, refer to create-workflow.
Saving state from an AWS Glue job
Now that the workflow run properties are defined, you can access these properties from within an AWS Glue job and set the values that other jobs can read.
import sys import boto3 from awsglue.utils import getResolvedOptions glue_client = boto3.client("glue") #access the workflow name and run id args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID']) workflow_name = args['WORKFLOW_NAME'] workflow_run_id = args['WORKFLOW_RUN_ID'] ''' the same glue job could be a part of multiple workflows so it is good to access the workflow name and run_id to avoid undesired results ''' run_properties={} run_properties["control_file_path"]="fake path" run_properties["last_processed_date"]="fake date" glue_client.put_workflow_run_properties( Name=workflow_name, RunId=workflow_run_id, RunProperties=run_properties )
Accessing state from an AWS Glue job
You can read the properties from within an AWS Glue job:
#minimum required imports import sys import boto3 from awsglue.utils import getResolvedOptions glue_client = boto3.client("glue") #access the workflow name and run id args = getResolvedOptions(sys.argv, ['JOB_NAME','WORKFLOW_NAME', 'WORKFLOW_RUN_ID']) workflow_name = args['WORKFLOW_NAME'] workflow_run_id = args['WORKFLOW_RUN_ID'] ''' the same glue job could be a part of multiple workflows so it is good to access the workflow name and run_id to avoid undesired results ''' run_properties = glue_client.get_workflow_run_properties( Name=workflow_name, RunId=workflow_run_id )["RunProperties"] # let us access 2 keys - control_file_path & last_processed_date control_file_path=run_properties["control_file_path"] last_processed_date=run_properties["last_processed_date"]
Glue streaming ETL for near real-time jobs
A streaming extract, transform, load (ETL) job in AWS Glue is based on Apache Spark’s Structured Streaming engine, which provides a fault-tolerant, scalable, and easy way to achieve end-to-end stream processing. Streaming job running in AWS Glue inherit most or all of its best practices from an equivalent Apache Spark structured streaming job.
Jobs and scalability
Streaming jobs in AWS Glue can consume data from streaming
sources such as
Amazon Kinesis Data Streams
In Spark, a task is the smallest individual unit of job run. Each Spark task by default maps to a single core and works on a single Spark DataFrame partition of data. If there are equal number of workers available as number of partitions, Spark runs each task on a separate worker. Otherwise, it runs more than one task on a single worker (or data processing unit (DPU)).
For example, G.1X instances can support one Spark executor with four cores. Assuming there is enough capacity, the worker can have four tasks and can process four partitions in parallel. Increasing the Kafka topic partitions (on Kinesis shards), in general, is a good recommendation to improve AWS Glue streaming ETL job performance. Having more partitions (preferably multiples of cores) than cores avoids having idle executors, and is a good way to scale up.
However, increasing the partitions or shards may introduce more management and operational overheads. You should follow the Kafka/Kinesis recommendations on correctly sizing the partitions or shards, based on your read/write throughput requirements. Alternatively, use the following two options to increase parallelism.
-
Programmatically increase spark partitions — Spark API does support the
repartition(numPartitions)
method. This method is used to increase or decrease the number of partitions of a Spark data frame. To see the current number of partitions on a data frame, you can use therdd.getNumPartitions()
method. You can pass a bigger number of partitions as argument to the repartition method on the micro-batch data frame. It will create partitions of more or less equal size by performing a full shuffle of data across all the nodes. It is recommended to try this option on your actual workload to ensure that the overhead (of shuffle) is small and the benefits are higher. -
Add more workers — You can configure AWS Glue ETL jobs with a higher number of workers by setting the
NumberOfWorkers
. This is a good option if the current number of workers doesn’t have enough capacity to allocate Spark tasks for all the partitions/shards.
In summary, for maximum parallelism, allocate the number of AWS Glue workers proportionate to the number of input partitions/shards. However, based on the workload, you can experiment with different workers (type and count) that suit your application’s processing requirements, and pick the one that best fits your needs. At the time of this writing, AWS Glue is previewing autoscaling for AWS Glue streaming jobs. You can find more details on the feature in Using Auto Scaling for AWS Glue.
Job recovery
AWS Glue streaming is based on Apache Spark’s Structured Streaming engine, which provides a fault-tolerant, scalable, and easy way to achieve end-to-end stream processing. Applications/jobs can use Spark's checkpoint mechanism for maintaining intermediate state and recovery from failure. AWS Glue streaming ETL jobs let you configure an S3 location for storing the checkpoint data. Unlike batch ETL, streaming ETL doesn't depend on bookmarks. Therefore, for reprocessing a dataset, you will need to delete the checkpoint directory. Note that if the application is configured to read the latest records from source, deleting the checkpoint will still fetch only the latest records, and not the previously processed records.
Schema handling
AWS Glue streaming jobs can infer the schema of incoming data streams. However, if you have a fixed schema data and you are already aware of the data structure and type, predefining the schema in a catalog table or AWS Glue schema registry is recommended. This is because automatic schema detection restricts the ability to perform operations such as joins on the stream.
AWS Glue DataBrew for data quality
Organizations are generating more and more data these days, and it
is increasingly important for them to maintain the quality of data
to make it usable by downstream applications and analytics
services to make important business decisions. Data comes from
many different data streams or applications that may develop
slight changes in data schema and data profile over a period of
time. Data is then stored in a data lake for further consumption.
If the quality of data is not maintained and data is not properly
curated, the data in the data lake may become unusable by
consumers. The data lake may turned into a
data
swamp
AWS Glue DataBrew
You can also use AWS Glue DataBrew to evaluate the quality of your
data by profiling it to understand data patterns, and detect
anomalies by connecting directly to your data lake, data
warehouses, and databases. To ensure data is always of high
quality, you need to consistently profile new data, evaluate that
it meets your business rules, and alert for problems in the data
to fix any issues. Refer to the AWS blog post
Setting
up automated data quality workflows and alerts using AWS Glue DataBrew and AWS Lambda
Memory management for large ETL workloads
AWS Glue uses Apache Spark in its core, which provides several approaches for memory management to optimize the use of memory for workloads with large data volume. This section shows how you can combine AWS Glue capabilities and Spark best practices for handling large jobs.
Optimization with Spark Driver
Push down predicates — AWS Glue jobs let you use pushdown predicates to prune the partitions not required from the table before data is read. This comes handy for tables with large numbers of partitions, of which the job requires only a subset for processing.
However, note that the push_down_predicate
option is applied only after the partitions are listed, and it
alone doesn’t help with faster data fetch. You can use
server-side partition pruning using
catalogPartitionPredicate
, which uses the
partition index.
Following is an example usage of push_down_predicate
alongside
catalogPartitionPredicate
.
dynamic_frame = glueContext.create_dynamic_frame.from_catalog( database=dbname, table_name=tablename, transformation_ctx="tx_context_0", push_down_predicate="col_1>=100 and col_2 <=10", additional_options={"catalogPartitionPredicate":"part_1='US' and part_2='2022'"} )
AWS Glue S3 Lister — AWS Glue
provides an optimized mechanism to list files on S3 while
reading data into a DynamicFrame. The AWS Glue S3 Lister can be
enabled by setting the DynamicFrame’s additional_options
parameter
useS3ListImplementation
to
True
. The AWS Glue S3 Lister offers advantage over the default
S3 list implementation by strictly iterating over the final list
of filtered files to be read.
datasource = glue_context.create_dynamic_frame.from_catalog( database = "tpc", table_name = "dl_web_sales", push_down_predicate = partitionPredicate, additional_options = {"useS3ListImplementation":True} )
Grouping — AWS Glue allows
you to consolidate multiple files per Spark task using the
file
grouping feature. Grouping files together reduces the
memory footprint on the Spark driver, as well as simplifying
file split orchestration. Without grouping, a Spark application
must process each file using a different Spark task. Each task
must then send a mapStatus
object
containing the location information to the Spark driver. In our
testing using the AWS Glue standard worker type, we found that
Spark applications processing more than roughly 650,000 files
often cause the Spark driver to crash with an “out of memory:
exception as shown in the following error message:
# java.lang.OutOfMemoryError: Java heap space # -XX:OnOutOfMemoryError="kill -9 %p" # Executing /bin/sh -c "kill -9 12039"...
groupFiles
allows you to
group files within a Hive-style S3 partition (inPartition
) and
across S3 partitions (acrossPartition
).
groupSize
is an optional
field that allows you to configure the amount of data to be read
from each file and processed by individual Spark tasks.
dyf = glueContext.create_dynamic_frame_from_options("s3",{'paths': ["s3://input-s3-path/"],'recurse':True,'groupFiles': 'inPartition','groupSize': '1048576'},format="json")
Exclusions for S3 storage classes — AWS Glue offers the ability to exclude objects based on their underlying S3 storage class. As the lifecycle of data evolves, hot data becomes cold, and automatically moves to lower-cost storage based on the configured S3 bucket policy, it’s important to make sure ETL jobs process the correct data. This is particularly useful when working with large datasets that span across multiple S3 storage classes using the Apache Parquet file format, where Spark will try to read the schema from the file footers in these storage classes.
Amazon S3 offers a range of storage classes:
-
S3 Standard for frequently accessed data
-
S3 Standard-Infrequent Access (S3 Standard-IA) and S3 One Zone-Infrequent Access (S3 One Zone-IA) for less frequently accessed data
-
S3 Glacier Instant Retrieval for archive data that needs immediate access
-
S3 Glacier Flexible Retrieval (formerly S3 Glacier) for rarely accessed long-term data that does not require immediate access
-
Amazon S3 Glacier Deep Archive (S3 Glacier Deep Archive) for long-term archive and digital preservation with retrieval in hours at the lowest cost storage in the cloud
When reading data using DynamicFrames
, you can specify a list of
S3 storage classes you want to exclude. This feature uses the
optimized AWS Glue S3 Lister. The following example shows how to
exclude files stored in GLACIER
and DEEP_ARCHIVE
storage
classes.
glueContext.create_dynamic_frame.from_catalog( database = "my_database", tableName = "my_table_name", redshift_tmp_dir = "", transformation_ctx = "my_transformation_context", additional_options = { "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] ) )
GLACIER
and DEEP_ARCHIVE
storage classes allow only listing
files and require an asynchronous
S3
restore process to read the actual data. The following is
the exception you will see when you try to access Glacier and
Deep Archive storage classes from your AWS Glue ETL job:
java.io.IOException: com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model. AmazonS3Exception: The operation is not valid for the object's storage class (Service: Amazon S3; Status Code: 403; Error Code: InvalidObjectState; Request ID: ), S3 Extended Request ID: (1)
Optimize Spark queries
Inefficient queries or transformations can have a significant impact on Apache Spark driver memory utilization. Common examples include:
-
Collect
is a Spark action that collects the results from workers and returns them back to the driver. In most cases, the results may be very large, overwhelming the driver. It is recommended to be careful while usingcollect
, because it can frequently cause Spark driver out of memory (OOM) exceptions. To preview the data from a previous transformation, you can rely on thetake
ortakeSample
actions. -
Shared variables —Apache Spark offers two different ways to share variables
between Spark driver and executors: broadcast variables and accumulators. Broadcast variables are useful to provide a read-only copy of data or fact tables shared across Spark workers to improve map-side joins. Accumulators are useful to provide a writable copy to implement distributed counters across Spark executors. Both should be used carefully, and destroyed when no longer needed, as they can frequently result in Spark driver OOM exceptions.
Manage shuffle disk capacity and memory constraint
In Apache Spark, Shuffle is a mechanism for
redistributing data so that it’s grouped differently across
partitions. Some of the transformations or operations which can
cause a shuffle
include repartition
,
coalesce
, distinct
,
and ByKey
(except for
counting),
groupByKey
and reduceByKey
,
and join
operations such
as cogroup
and
join
.
The Shuffle operation is triggered when the wide transformation needs information from other partitions to complete its processing. Spark gathers the required data from each partition and combines it into a new partition. During a Shuffle phase, when data does not fit in memory, the Spark map tasks write shuffle data to a local disk that is transferred across the network and fetched by Spark reduce tasks. With AWS Glue, workers write shuffle data on local disk volumes attached to the AWS Glue workers. The following diagram illustrates how the Spark map tasks write the shuffle and spill files to local disks.
Challenges with the Shuffle operation — The process of shuffling data results in additional overhead of disk input/output (I/O), data serialization, network I/O, and increased garbage collection, making the Shuffle a complex and costly operation. It leads to the following challenges:
-
Hitting local storage limits — Due to the overhead, the Shuffle operation is often constrained by the available local disk capacity, or data skew, which can cause straggling executors. Spark often throws a “No space left on device” or
MetadataFetchFailedException
error when there is not enough disk space left on the executor; and there is no recovery from the exception state. If you are using G.1X workers, a quick attempt to get around the issue would be to try G.2X worker which has twice the disk capacity. -
Co-location of storage with executors — If an executor is lost, then shuffle files are lost as well. This leads to several task and stage retries, as Spark tries to recompute stages in order to recover lost shuffle data. Spark natively provides an external shuffle service that lets it store shuffle data independent to the life of executors. But the shuffle service itself becomes a point of failure and must always be up in order to serve shuffle data. Additionally, shuffles are still stored on local disk, which might run out of space for a large job.
To overcome these challenges, a new Spark shuffle manager is available in AWS Glue that disaggregates Spark compute and shuffle storage by utilizing Amazon S3 to store Spark shuffle and spill files. Using Amazon S3 for Spark shuffle storage lets you run data-intensive workloads much more reliably, and scale elastically. The following figure illustrates how Spark map tasks write the shuffle and spill files to the given Amazon S3 shuffle bucket. Reducer tasks consider the shuffle blocks as remote blocks, and read them from the same shuffle bucket.
Enabling AWS Glue Spark shuffle manager – AWS Glue 2.0 allows users to configure the AWS Glue Spark shuffle manager. It can be enabled using the following job parameters.
Table 1 — Job parameters for AWS Glue shuffle manager
Key | Value | Explanation |
---|---|---|
--write-shuffle-files-to-s3
|
TRUE
|
This is the main flag, which tells Spark to use S3 buckets for writing and reading shuffle data. |
--write-shuffle-spills-to-s3
|
TRUE
|
This is an optional flag that lets you offload spill files to S3 buckets, which provides additional resiliency to your Spark job. This is only required for large workloads that spill a lot of data to disk. This flag is disabled by default. |
--conf
|
spark.shuffle.glue.s3ShuffleBucket=s3://<shuffle-bucket>
|
This is also optional, and it specifies the S3 bucket where we write the shuffle files. |
Consider this solution in following situations:
-
This feature is recommended when you want to ensure the reliable run of your data intensive workloads that create a large amount of shuffle or spill data. Writing and reading shuffle files from Amazon S3 is marginally slower when compared to local disk for our experiments with TPC-DS
queries. S3 shuffle performance is impacted by the number and size of shuffle files. For example, S3 can be slower for reads as compared to local storage if you have a large number of small shuffle files or partitions in your Spark application. -
You can use this feature if your job frequently suffers from “No space left on device” issues.
-
You can use this feature if your job frequently suffers fetch failure issues:
org.apache.spark.shuffle.MetadataFetchFailedException
-
You can use this feature if your data is skewed.
-
We recommend setting the S3 bucket lifecycle policies on the shuffle bucket (
spark.shuffle.glue.s3ShuffleBucket
) to clean up old shuffle data. -
This feature is available on AWS Glue 2.0 and Spark 2.4.3.