Building a performance efficient data pipeline - AWS Glue Best Practices: Building a Performant and Cost Optimized Data Pipeline

Building a performance efficient data pipeline

The Performance Efficiency pillar includes the ability to use computing resources efficiently to meet workload requirements, and to maintain that efficiency as demand changes and technologies evolve. Here we consider some of the factors which help you improve the Performance Efficiency aspects of your data pipeline.

Data partitioning and bucketing

If you need to ingest data from a large dataset into your data pipeline and the data is not properly distributed for optimized usage of compute resources, the performance efficiency may not be optimal. Partitioning and bucketing can help you get the best performance from your data pipeline by distributing the data, and reducing the amount of data that needs to be read by the respective compute resources.

Partitioning data

Partitioning groups data into parts and keeps the related data together based on a specific value or values. For example, a customer who wants to store website clicks stream data might store it by grouping the data together by year, month, day, country, and so on. These grouped data sets are then stored together as files in the same partition.

Partition keys act as virtual columns. You define them at table creation time, and when data is added to Amazon S3 via services such as AWS Glue extract, transform, load (ETL), Amazon Athena, and S3, data is grouped and persisted based on the partition keys values such as country=US.

Data can be partitioned with one or more partition column and the partitions are hierarchical. For example:

s3://<bucket_name>/<table_name>/country=<country value>/year=<year_value>/month=<month_value>/day=<day_value>/

Here the data is partitioned first by country and then by year, month, and day. The partition format shown is called “Hive style”, which adds a key (in this example, the country) and value (such as US) to the file path. The other partition style supported by AWS Glue is “Unnamed style”; however, hive partition style is one of the most widely used methods of partitioning data for big data processing.

Example: Data Definition Language (DDL) script of a partitioned table.

CREATE EXTERNAL TABLE website_clickstream_events ( `event_time` timestamp, `ip_address` string, `page` string, `action` string) PARTITIONED BY ( country string, year bigint, month bigint, day bigint ) INTO 1000 BUCKETS STORED AS TEXTFILE LOCATION 's3://<bucket name>/website_clickstream_events/'

Why partitioning?

Partitioning can help reduce the amount of data scanned, thereby improving performance and reducing overall cost of analytics using pushdown predicates. Using pushdown predicates, instead of reading the entire dataset and then filtering the data after it is loaded into memory, you can apply the filter directly on the partition metadata in the data catalog. Then you only list and read what you need.

Example: Partitions

s3://<bucket_name>/webclickstream_events/country=US/year=2021/month=06/day=01/ s3://<bucket_name>/webclickstream_events/country=US/year=2021/month=06/day=02/ s3://<bucket_name>/webclickstream_events/country=US/year=2021/month=06/day=03/

Pre-filtering using pushdown predicates

In many cases, you can use a pushdown predicate to filter on partitions without having to list and read all the files in your dataset. Instead of reading the entire dataset and then filtering in a DynamicFrame, you can apply the filter directly on the partition metadata in the data catalog. Then you only list and read what you need into a DynamicFrame.

In the previous S3 partitions example, when you want to get data for day=01, instead of loading all files under s3://<bucket name>/website_clickstream_events/, AWS Glue can load data under path s3://<bucket name>/website_clickstream_events/country=US/year=2021/month=06/day=01/ by filtering partitions based on the day predicate and load files under day=01/ . This improves the data processing performance and reduces the overall cost for analytics.

Example: Query

datasource = glueContext.create_dynamic_frame_from_catalog( database = "default", table_name = "website_clickstream_events", push_down_predicate = "year = '2021' and month = '06' and day='01' ", transformation_ctx = "datasource")

How to partition?

AWS Glue supports partitioning data using Spark SQL and DataFrame APIs. You can partition the data by specifying the columns based on which you want to group the data. For example, by year, month, or country.

Each file stored inside a partition should be at least 128 MB to a maximum of one GB to get ensure that AWS Glue (Spark) can read and process the data efficiently. If the file sizes are too small (KBs to few MBs), AWS Glue will spend more time in I/O and can lead to degraded performance.

You should choose partitions column that have similar characteristics, such as records from the same country and that can have a limited number of possible values. This characteristic is known as data cardinality. For example, if you partition by the column country, and this column has a limited number of distinct values (low cardinality), partitioning by country works well and decreases query latency. But if you partition by the column transaction date, it’ll have a higher number of distinct values (high cardinality) and leads to increased query latency.

Partition index

Querying tables with many partitions (10s of 1000s), create performance challenges as AWS Glue has to scan through the partitions in the AWS Glue Data Catalog and load the partitions that are relevant to the query/filter criteria. In order to improve the response time of scanning tables with large number of partitions, AWS Glue Data Catalog now provides Partition Indexes that can help improve performance.

Partition indexes are created by combining a sub list of partition keys defined in the table. A partition index can be created on any list of partition keys defined on the table. For the previous website_clickstream_events table, some of the possible indexes are (country, year, month, day), (country, year), (country).

There is a soft limit for number of partitions in a table and across all AWS Glue tables in an AWS Account. A soft limit can be increased by raising support tickets.

Refer to Working with Partition Indexes to understand how to create and manage partition indexes.

File formats and data compression

Columnar data formats are used in data lake storage for faster analytics workloads, as opposed to row formats. Columnar formats significantly reduce the amount of data that needs to be fetched by accessing columns that are relevant for the workload. Let’s look at each of these formats in more detail.

Row vs. columnar storage

Columnar storage for database tables is an important factor in optimizing analytic query performance because it drastically reduces the overall disk I/O requirements and reduces the amount of data you need to load from disk.

The following series of images describe how columnar data storage implements efficiencies and how that translates into efficiencies when retrieving data into memory.

This first image shows how records from database tables are typically stored into disk blocks by row.

A screenshot that shows how records from database tables are typically stored into disk blocks by row .

How records from database tables are typically stored into disk blocks by row

In a typical relational database table, each row contains field values for a single record. In row-wise database storage, data blocks store values sequentially for each consecutive column making up the entire row. If block size is smaller than the size of a record, storage for an entire record may take more than one block. If block size is larger than the size of a record, storage for an entire record may take less than one block, resulting in an inefficient use of disk space. In online transaction processing (OLTP) applications, most transactions involve frequently reading and writing all the values for entire records, typically one record or a small number of records at a time. As a result, row-wise storage is optimal for OLTP databases.

The next image shows how with columnar storage, the values for each column are stored sequentially into disk blocks.

A screenshot that shows how, with columnar storage, values for each column are stored sequentially into disk blocks..

With columnar storage, values for each column are stored sequentially into disk blocks.

Using columnar storage, each data block stores values of a single column for multiple rows.

In this simplified example, using columnar storage, each data block holds column field values for as many as three times as many records as row-based storage. This means that reading the same number of column field values for the same number of records requires a third of the input/output (I/O) operations compared to row-wise storage. In practice, using tables with very large numbers of columns and very large row counts, storage efficiency is even greater.

An added advantage is that, since each block holds the same type of data, block data can use a compression scheme selected specifically for the column data type, further reducing disk space and I/O.

The savings in space for storing data on disk also carries over to retrieving and then storing that data in memory. Since many database operations only need to access or operate on one or a small number of columns at a time, you can save memory space by only retrieving blocks for columns you need for a query. Where OLTP transactions typically involve most or all the columns in a row for a small number of records, data analysis queries commonly read only a few columns for a very large number of rows. This means that reading the same number of column field values for the same number of rows requires a fraction of the I/O operations and uses a fraction of the memory that would be required for processing row-wise blocks.

In practice, using tables with very large numbers of columns and very large row counts, the efficiency gains are proportionally greater. For example, suppose a table contains 100 columns. A query that uses five columns will only need to read about five percent of the data contained in the table. This savings is repeated for possibly billions or even trillions of records for large databases. In contrast, a row-wise database would read the blocks that contain the 95 unneeded columns as well.

Apache Parquet and ORC are columnar storage formats that are optimized for fast retrieval of data and used in AWS analytical applications. Columnar storage formats have the following characteristics that make them suitable for using with data analysis:

  • Compression by column, with compression algorithm selected for the column data type to save storage space in Amazon S3 and reduce disk space and I/O during query processing.

  • Predicate pushdown in Parquet and ORC enables search engine to fetch only the blocks it needs, improving query performance. When a query obtains specific column values from your data, it uses statistics from data block predicates, such as max/min values, to determine whether to read or skip the block.

  • Splitting of data in Parquet and ORC allows search engine to split the reading of data to multiple readers and increase parallelism during its query processing.

To convert your existing raw data from other storage formats to Parquet or ORC, you can run CREATE TABLE AS SELECT (CTAS) queries in Athena and specify a data storage format as Parquet or ORC, or use a AWS Glue ETL job.

Compression

Compressing data help reduce the amount of data stored in the storage layer and improves the write and read operation performance along with improved network and I/O throughput. Compared to working with uncompressed data, data compression improves overall data pipeline performance.

AWS Glue supports multiple compression formats natively. Some of the popular formats are

  • SNAPPYSnappy is the default compression format for files in the Parquet data storage format. It is a fast compression algorithm that provides moderate compression at a minimum speed of 250MB/s. When combined with Parquet format, you can create highly compressed, splittable files that enable better performance and throughput.

  • ZLIBZLIB is the default compression format for files stored in the Optimized Row Columnar (ORC) data storage format in AWS Glue. ORC is the default storage format for Apache Hive/Tez Engine.

  • GzipGzip compression is one of most widely available compression codec. You can use this compression format when you need to exchange data across wide variety of applications and systems that may not necessarily support other formats. GZIP is CPU-intensive and it is not splittable. It cannot be processed in parallel by distributed data processing engines. Hence, it is a good fit for processing data that are not used often but require a high compression ratio, such as archival data.

  • BZIP2BZip2 can provide better compression ratio than GZip at the cost of speed (CPU). It is a splittable format and can be processed in parallel by distributed data processing engines. It is a good option when compression needs are critical. Because BZip2 is compute intensive, it is not recommended for data that are queried often.

Following are some factors to consider when choosing one or the other compression format

Table 1 — Factors to consider when choosing a compression format

Algorithm Splittable? Compression ratio Compress / Decompress speed
Gzip (DEFLATE) No High Medium
Bzip2 Yes Very high Slow
LZO No Low Fast
Snappy No Low Very fast

Configure compression format in AWS Glue

In AWS Glue, compression format for a file can be specified in few ways depending upon how you access the data.

Using the AWS Glue’ dynamic data frame library

glueContext.write_dynamic_frame.from_options( frame = datasource1, connection_type = "s3", connection_options = { "path": "s3://s3path" }, format = "glueparquet", format_options={"compression": "snappy"} transformation_ctx = "datasink1")

Using PySpark

df.write .option("compression", "snappy") .parquet("s3://output-path")

Using Amazon Athena / SPARK SQL

CREATE EXTERNAL TABLE sampleTable ( column1 INT, column2 INT ) STORED AS PARQUET TBLPROPERTIES ( 'classification'='parquet', 'compression'='snappy') LOCATION '"s3://output-path"'

Avoid or minimize User defined functions (UDFs)

User-defined functions (UDFs) are user-programmable routines that transform values from a single row to produce a single corresponding output value per row. UDFs, gives data engineers the flexibility to create new functions in higher level languages, abstracting their lower-level language implementations. However, our recommendation when working with AWS Glue or Spark code would be to use native Spark SQL functions as much as possible and limit the usage of UDFs to scenarios where a built-in function doesn’t exist.

Spark SQL functions operate directly on a Java virtual machine (JVM) and are well integrated with both Catalyst and Tungsten. This provides the advantage that functions can be optimized in the execution plan and benefit from spark native optimizations. UDFs in general increase the memory footprint because of the need to serialize and deserialize the data to be sent across spark execution engine and the JVM (plus Python process in case of PySpark).