Parallelize tasks -

Parallelize tasks

To optimize performance, it's important to parallelize tasks for data loads and transformations. As we discussed in Key topics in Apache Spark, the number of resilient distributed dataset (RDD) partitions is important, because it determines the degree of parallelism. Each task that Spark creates corresponds to an RDD partition on a 1:1 basis. To achieve the best performance, you need to understand how the number of RDD partitions is determined and how that number is optimized.

If you do not have enough parallelism, the following symptoms will be recorded in CloudWatch metrics and the Spark UI.

CloudWatch metrics

Check the CPU Load and Memory Utilization. If some executors are not processing during a phase of your job, it's appropriate to improve parallelism. In this case, during the visualized timeframe, Executor 1 was performing a task, but the remaining executors (2, 3, and 4) were not. You can infer that those executors were not assigned tasks by the Spark driver.

Graph showing driver and only one executor.

Spark UI

On the Stage tab in the Spark UI, you can see the number of tasks in a stage. In this case, Spark has performed only one task.

""

Additionally, the event timeline shows Executor 1 processing one task. This means that the work in this stage was performed entirely on one executor, while the others were idle.

Event timeline showing only one task.

If you observe these symptoms, try the following solutions for each data source.

Parallelize data load from Amazon S3

To parallelize data loads from Amazon S3, first check the default number of partitions. You can then manually determine a target number of partitions, but be sure to avoid having too many partitions.

Determine the default number of partitions

For Amazon S3, the initial number of Spark RDD partitions (each of which corresponds to a Spark task) is determined by features of your Amazon S3 dataset (for example, format, compression, and size). When you create an AWS Glue DynamicFrame or a Spark DataFrame from CSV objects stored in Amazon S3, the initial number of RDD partitions (NumPartitions) can be approximately calculated as follows:

  • Object size <= 64 MB: NumPartitions = Number of Objects

  • Object size > 64 MB: NumPartitions = Total Object Size / 64 MB

  • Unsplittable (gzip): NumPartitions = Number of Objects

As discussed in the Reduce the amount of data scan section, Spark divides large S3 objects into splits that can be processed in parallel. When the object is larger than the split size, Spark splits the object and creates an RDD partition (and task) for each split. Spark's split size is based on your data format and runtime environment, but this is a reasonable starting approximation. Some objects are compressed using unsplittable compression formats such as gzip, so Spark cannot split them.

The NumPartitions value might vary depending on your data format, compression, AWS Glue version, number of AWS Glue workers, and Spark configuration.

For example, when you load a single 10 GB csv.gz object using a Spark DataFrame, the Spark driver will create only one RDD Partition (NumPartitions=1) because gzip is unsplittable. This results in a heavy load on one particular Spark executor and no tasks are assigned to the remaining executors, as described in following figure.

Check the actual number of tasks (NumPartitions) for the stage on the Spark Web UI Stage tab, or run df.rdd.getNumPartitions() in your code to check the parallelism.

When encountering a 10 GB gzip file, examine whether the system generating that file can generate it in a splittable format. If this isn't an option, you might need to scale cluster capacity to process the file. To run transforms efficiently on the data that you loaded, you will need to rebalance your RDD across the workers in your cluster by using repartition.

Manually determine a target number of partitions

Depending on the properties of your data and Spark's implementation of certain functionalities, you might end up with a low NumPartitions value even though the underlying work can still be parallelized. If NumPartitions is too small, run df.repartition(N) to increase the number of partitions so that the processing can be distributed across multiple Spark executors.

In this case, running df.repartition(100) will increase NumPartitions from 1 to 100, creating 100 partitions of your data, each with a task that can be assigned to the other executors.

The operation repartition(N) divides the entire data equally (10 GB / 100 partitions = 100 MB/partition), avoiding data skew to certain partitions.

Note

When a shuffle operation such as join is run, the number of partitions is dynamically increased or decreased depending on the value of spark.sql.shuffle.partitions or spark.default.parallelism. This facilitates a more efficient exchange of data between Spark executors. For more information, see the Spark documentation.

Your goal when determining the target number of partitions is to maximize the use of the provisioned AWS Glue workers. The number of AWS Glue workers and the number of Spark tasks are related through the number of vCPUs. Spark supports one task for each vCPU core. In AWS Glue version 3.0 or later, you can calculate a target number of partitions by using the following formula.

# Calculate NumPartitions by WorkerType numExecutors = (NumberOfWorkers - 1) numSlotsPerExecutor = 4 if WorkerType is G.1X 8 if WorkerType is G.2X 16 if WorkerType is G.4X 32 if WorkerType is G.8X NumPartitions = numSlotsPerExecutor * numExecutors # Example: Glue 4.0 / G.1X / 10 Workers numExecutors = ( 10 - 1 ) = 9 # 1 Worker reserved on Spark Driver numSlotsPerExecutor = 4 # G.1X has 4 vCpu core ( Glue 3.0 or later ) NumPartitions = 9 * 4 = 36

In this example, each G.1X worker provides four vCPU cores to a Spark executor (spark.executor.cores = 4). Spark supports one task for each vCPU Core, so G.1X Spark executors can run four tasks simultaneously (numSlotPerExecutor). This number of partitions makes full use of the cluster if tasks take an equal amount of time. However, some tasks will take longer than others, creating idle cores. If this happens, consider multiplying numPartitions by 2 or 3 to break up and efficiently schedule the bottleneck tasks.

Too many partitions

An excessive number of partitions creates an excessive number of tasks. This causes a heavy load on the Spark driver because of overhead related to distributed processing, such as management tasks and data exchange between Spark executors.

If the number of partitions in your job is substantially larger than your target number of partitions, consider reducing the number of partitions. You can reduce partitions by using the following options:

  • If your file sizes are very small, use AWS Glue groupFiles. You can reduce the excessive parallelism resulting from the launch of an Apache Spark task to process each file.

  • Use coalesce(N) to merge partitions together. This is a low-cost process. When reducing the number of partitions, coalesce(N) is preferred over repartition(N), because repartition(N) performs shuffle to distribute the amount of records in each partition equally. That increases costs and management overhead.

  • Use Spark 3.x Adaptive Query Execution. As discussed in the Key topics in Apache Spark section, Adaptive Query Execution provides a function to automatically coalesce the number of partitions. You can use this approach when you can't know the number of partitions until you perform the execution.

Parallelize data load from JDBC

The number of Spark RDD partitions is determined by configuration. Note that by default only a single task is run to scan an entire source dataset through a SELECT query.

Both AWS Glue DynamicFrames and Spark DataFrames support parallelized JDBC data load across multiple tasks. This is done by using where predicates to split one SELECT query into multiple queries. To parallelize reads from JDBC, configure the following options:

  • For AWS Glue DynamicFrame, set hashfield (or hashexpression) and hashpartition. To learn more, see Reading from JDBC tables in parallel.

    connection_mysql8_options = { "url": "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test", "dbtable": "medicare_tb", "user": "test", "password": "XXXXXXXXX", "hashexpression":"id", "hashpartitions":"10" } datasource0 = glueContext.create_dynamic_frame.from_options( 'mysql', connection_options=connection_mysql8_options, transformation_ctx= "datasource0" )
  • For Spark DataFrame, set numPartitions, partitionColumn, lowerBound, and upperBound. To learn more, see JDBC To Other Databases.

    df = spark.read \ .format("jdbc") \ .option("url", "jdbc:mysql://XXXXXXXXXX.XXXXXXX.us-east-1.rds.amazonaws.com:3306/test") \ .option("dbtable", "medicare_tb") \ .option("user", "test") \ .option("password", "XXXXXXXXXX") \ .option("partitionColumn", "id") \ .option("numPartitions", "10") \ .option("lowerBound", "0") \ .option("upperBound", "1141455") \ .load() df.write.format("json").save("s3://bucket_name/Tests/sparkjdbc/with_parallel/")

Parallelize data load from DynamoDB when using the ETL connector

The number of Spark RDD partitions is determined by the dynamodb.splits parameter. To parallelize reads from Amazon DynamoDB, configure the following options:

Parallelize data load from Kinesis Data Streams

The number of Spark RDD partitions is determined by the number of shards in the source Amazon Kinesis Data Streams data stream. If you have only a few shards in your data stream, there will be only a few Spark tasks. This can result in low parallelism in downstream processes. To parallelize reads from Kinesis Data Streams, configure the following options:

  • Increase the number of shards to obtain more parallelism when loading data from Kinesis Data Streams.

  • If your logic in the micro-batch is complex enough, consider repartitioning the data at the beginning of the batch, after dropping unneeded columns.

For more information, see Best practices to optimize cost and performance for AWS Glue streaming ETL jobs.

Parallelize tasks after data load

To parallelize tasks after data load, increase the number of RDD partitions by using the following options:

  • Repartition data to generate a greater number of partitions, especially right after initial load if the load itself could not be parallelized.

    Call repartition() either on DynamicFrame or DataFrame, specifying the number of partitions. A good rule of thumb is two or three times the number of cores available.

    However, when writing a partitioned table, this can lead to an explosion of files (each partition can potentially generate a file into each table partition). To avoid this, you can repartition your DataFrame by column. This uses the table partition columns so the data is organized before writing. You can specify a higher number of partitions without getting small files on the table partitions. However, be careful to avoid data skew, in which some partition values end up with most of the data and delay the completion of the task.

  • When there are shuffles, increase the spark.sql.shuffle.partitions value. This also can help with any memory issues when shuffling.

    When you have more than 2,001 shuffle partitions, Spark uses a compressed memory format. If you have a number close to that, you might want to set the spark.sql.shuffle.paritions value over that limit to get the more efficient representation.