Optimización del desempeño de Spark - Amazon EMR

Optimización del desempeño de Spark

Amazon EMR proporciona múltiples características de optimización del rendimiento para Spark. En este tema se explica cada característica de optimización en detalle.

Para obtener más información acerca de cómo definir la configuración de Spark, consulte Configurar Spark.

Ejecución de consultas adaptativas

La ejecución de consultas adaptativas es un marco para reoptimizar los planes de consultas en función de las estadísticas del tiempo de ejecución. A partir de Amazon EMR 5.30.0, las siguientes optimizaciones de ejecución de consultas adaptativas de Apache Spark 3 están disponibles en el tiempo de ejecución de Apache EMR para Spark 2.

  • Conversión de unión adaptativa

  • Fusión adaptativa de particiones aleatorias

Conversión de unión adaptativa

La conversión de unión adaptativa mejora el rendimiento de las consultas al convertir las operaciones sort-merge-join en operaciones broadcast-hash-joins en función del tamaño del tiempo de ejecución de las etapas de las consultas. Las operaciones broadcast-hash-joins suelen ofrecer un mejor rendimiento cuando un lado de la unión es lo suficientemente pequeño como para transmitir su salida de manera eficiente entre todos los ejecutores, lo que evita la necesidad de intercambiar y ordenar de forma aleatoria ambos lados de la unión. La conversión adaptativa de uniones amplía la gama de casos en los que Spark realiza automáticamente operaciones broadcast-hash-joins.

Esta característica está habilitada de forma predeterminada. Para deshabilitarla, puede establecer spark.sql.adaptive.enabled en false, lo que también deshabilita el marco de ejecución de consultas adaptativas. Spark decide convertir una operación sort-merge-join en una operación broadcast-hash-join cuando la estadística del tamaño del tiempo de ejecución de uno de los lados de la unión no supera el valor de spark.sql.autoBroadcastJoinThreshold, que adopta 10 485 760 bytes (10 MiB) como valor predeterminado.

Fusión adaptativa de particiones aleatorias

La fusión adaptativa de particiones aleatorias mejora el rendimiento de las consultas al fusionar pequeñas particiones aleatorias contiguas para evitar la sobrecarga que supone tener demasiadas tareas pequeñas. Esto le permite configurar un mayor número de particiones aleatorias iniciales por adelantado, que luego se reduce en tiempo de ejecución hasta el tamaño deseado, lo que aumenta las posibilidades de tener particiones aleatorias distribuidas de manera más uniforme.

Esta característica está habilitada de forma predeterminada a menos que spark.sql.shuffle.partitions se establezca de forma explícita. Para habilitarla, se puede establecer spark.sql.adaptive.coalescePartitions.enabled en true. Tanto el número inicial de particiones aleatorias como el tamaño de las particiones de destino se pueden ajustar con las propiedades spark.sql.adaptive.coalescePartitions.minPartitionNum y spark.sql.adaptive.advisoryPartitionSizeInBytes, respectivamente. Consulta la siguiente tabla para obtener más información sobre las propiedades de Spark relacionadas con esta característica.

Propiedades de las particiones de la fusión adaptativa de Spark
Propiedad Valor predeterminado Descripción

spark.sql.adaptive.coalescePartitions.enabled

true, a menos que spark.sql.shuffle.partitions se establezca de forma explícita

Si el valor es true y spark.sql.adaptive.enabled es true, Spark fusiona las particiones aleatorias contiguas según el tamaño de destino (especificado mediante spark.sql.adaptive.advisoryPartitionSizeInBytes) para evitar demasiadas tareas pequeñas.

spark.sql.adaptive.advisoryPartitionSizeInBytes

64 MB

Tamaño recomendado en bytes de la partición aleatoria al fusionarse. Esta configuración solo tiene efecto cuando spark.sql.adaptive.enabled y spark.sql.adaptive.coalescePartitions.enabled son true.

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

Número mínimo de particiones aleatorias después de la fusión. Esta configuración solo tiene efecto cuando spark.sql.adaptive.enabled y spark.sql.adaptive.coalescePartitions.enabled son true.

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1 000

Número inicial de particiones aleatorias antes de la fusión. Esta configuración solo tiene efecto cuando spark.sql.adaptive.enabled y spark.sql.adaptive.coalescePartitions.enabled son true.

Eliminación dinámica de particiones

La reducción dinámica de particiones mejora el rendimiento seleccionando con mayor precisión particiones específicas de una tabla que deben leerse y procesarse para una consulta específica. Al reducir la cantidad de datos leídos y procesados, se ahorra bastante tiempo en la ejecución de trabajos. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.24.0 y 5.25.0, puede establecer la propiedad spark.sql.dynamicPartitionPruning.enabled de Spark desde Spark o al crear clústeres para habilitar esta característica.

Propiedades de las particiones de la eliminación dinámica de particiones de Spark
Propiedad Valor predeterminado Descripción

spark.sql.dynamicPartitionPruning.enabled

true

Si es true, habilite la eliminación dinámica de particiones.

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

Si es true, Spark realiza una comprobación preventiva antes de ejecutar las consultas para garantizar que la reutilización de los intercambios de transmisiones en filtros de eliminación dinámica no se vea afectada por reglas de preparación posteriores (como las reglas de columnas definidas por el usuario). Cuando la reutilización se ve afectada y esta configuración es true, Spark elimina los filtros de eliminación dinámica afectados para evitar problemas de rendimiento y corrección. Pueden surgir problemas de corrección cuando el intercambio de transmisión del filtro de eliminación dinámica ofrece resultados diferentes e incoherentes a partir del intercambio de transmisión de la operación de unión correspondiente. Definir esta configuración en false debe hacerse con precaución, ya que permite evitar situaciones como, por ejemplo, cuando la reutilización se ve afectada por reglas de columnas definidas por el usuario. Cuando la ejecución de consultas adaptativas está habilitada, siempre se aplica la reutilización de transmisiones.

Esta optimización mejora las capacidades existentes de Spark 2.4.2, que solo admiten la postergación de predicados estáticos que se pueden resolver durante la planificación.

A continuación, se muestran ejemplos de postergación de predicados estáticos en Spark 2.4.2.

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

La reducción dinámica de particiones permite que el motor de Spark deduzca de forma dinámica durante el tiempo de ejecución qué particiones deben leerse y cuáles pueden eliminarse de forma segura. Por ejemplo, la siguiente consulta implica dos tablas: la tabla store_sales que contiene todas las ventas totales para todas las tiendas y se particiona por región y la tabla store_regions que contiene un mapeo de las regiones de cada país. Las tablas contienen datos sobre tiendas distribuidas por todo el mundo, pero solo realizan consultas en los datos de América del Norte.

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

Sin la reducción de particiones dinámicas, esta consulta leerá todas las regiones antes de filtrar el subconjunto de regiones que coinciden con los resultados de la subconsulta. Con la reducción dinámica de particiones, esta consulta leerá y procesará solo las particiones de las regiones que se devuelven en la subconsulta. Esto ahorra tiempo y recursos ya que lee menos datos del almacenamiento y procesa menos registros.

Aplanamiento de subconsultas escalares

Esta optimización mejora el rendimiento de las consultas que tienen subconsultas escalares a través de la misma tabla. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.24.0 y 5.25.0, puede establecer la propiedad spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled de Spark desde Spark o al crear clústeres para habilitarlo. Cuando esta propiedad se establece en true, el optimizador de consultas aplana subconsultas escalares de agregación que utilizan la misma relación en caso posible. Las consultas escalares se aplanan empujando cualquier predicado presente en la subconsulta en las funciones de agregación y, a continuación, realiza una agregación, con todas las funciones de agregación, por relación.

A continuación se presenta un ejemplo de una consulta que se beneficiará de esta optimización.

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

La optimización vuelve a escribir la consulta anterior como:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

Tenga en cuenta que la consulta reescrita lee la tabla solo una vez, y los predicados de las tres subconsultas se envían a la función avg.

DISTINCT antes de INTERSECT

Esta optimización optimiza uniones cuando se utiliza INTERSECT. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.24.0 y 5.25.0, puede establecer la propiedad spark.sql.optimizer.distinctBeforeIntersect.enabled de Spark desde Spark o al crear clústeres para habilitarlo. Las consultas con INTERSECT se convierten automáticamente para utilizar una semiunión pendiente. Cuando esta propiedad se establece en true, el optimizador de consultas inserta el operador DISTINCT en los elementos secundarios de INTERSECT si detecta que el operador DISTINCT puede convertir a la semiunión pendiente en una BroadcastHashJoin en lugar de una SortMergeJoin.

A continuación se presenta un ejemplo de una consulta que se beneficiará de esta optimización.

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

Sin habilitar esta propiedad spark.sql.optimizer.distinctBeforeIntersect.enabled, la consulta se reescribirá tal y como se indica a continuación.

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Cuando se habilita esta propiedad spark.sql.optimizer.distinctBeforeIntersect.enabled, la consulta se reescribirá tal y como se indica a continuación.

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Unión de filtros de Bloom

Esta optimización puede mejorar el rendimiento de algunas uniones efectuando un filtrado previo de un lado de una unión mediante un filtro de Bloom generado a partir de los valores del otro lado de la unión. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.25.0, puede establecer la propiedad de Spark spark.sql.bloomFilterJoin.enabled en true desde Spark o al crear clústeres para habilitar esta característica.

A continuación se muestra un ejemplo de consulta que puede beneficiarse de un filtro de Bloom.

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

Cuando esta característica está habilitada, el filtro de Bloom se crea a partir de todos los identificadores de elemento cuya categoría se encuentra en el conjunto de categorías que se están consultando. Durante el análisis de la tabla de ventas, el filtro de Bloom se utiliza para determinar qué ventas corresponden a artículos que definitivamente no están en el conjunto definido por el filtro de Bloom. De este modo, estas ventas identificadas pueden excluirse lo antes posible.

Reordenación de unión optimizada

Esta optimización puede mejorar el rendimiento de las consultas al reordenar las uniones en las que intervienen tablas con filtros. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.25.0, puede establecer el parámetro de configuración spark.sql.optimizer.sizeBasedJoinReorder.enabled de Spark en true para habilitar esta característica. El comportamiento predeterminado en Spark es unir las tablas de izquierda a derecha, como se indica en la consulta. Esta estrategia puede perder oportunidades para ejecutar primero uniones más pequeñas con filtros, con el fin de beneficiarse de uniones más costosas más tarde.

La consulta de ejemplo que aparece a continuación informa de todos los artículos devueltos de todas las tiendas de un país. Sin la reordenación de unión optimizada, Spark une primero las dos tablas grandes store_sales y store_returns; a continuación, las une con store y, finalmente, con item.

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

Con la reordenación de unión optimizada, Spark primero une store_sales con store, ya que store tiene un filtro y es más pequeño que store_returns y broadcastable. Luego Spark se une con store_returns y, finalmente, con item. Si item tuviera un filtro y se pudiera emitir, también se podría usar en la reordenación, lo que provocaría que store_sales se uniera con store, después con item y, finalmente, con store_returns.