Optimización del desempeño de Spark
Amazon EMR proporciona múltiples características de optimización del rendimiento para Spark. En este tema se explica cada característica de optimización en detalle.
Para obtener más información acerca de cómo definir la configuración de Spark, consulte Configurar Spark.
Ejecución de consultas adaptativas
La ejecución de consultas adaptativas es un marco para reoptimizar los planes de consultas en función de las estadísticas del tiempo de ejecución. A partir de Amazon EMR 5.30.0, las siguientes optimizaciones de ejecución de consultas adaptativas de Apache Spark 3 están disponibles en el tiempo de ejecución de Apache EMR para Spark 2.
-
Conversión de unión adaptativa
-
Fusión adaptativa de particiones aleatorias
Conversión de unión adaptativa
La conversión de unión adaptativa mejora el rendimiento de las consultas al convertir las operaciones sort-merge-join en operaciones broadcast-hash-joins en función del tamaño del tiempo de ejecución de las etapas de las consultas. Las operaciones broadcast-hash-joins suelen ofrecer un mejor rendimiento cuando un lado de la unión es lo suficientemente pequeño como para transmitir su salida de manera eficiente entre todos los ejecutores, lo que evita la necesidad de intercambiar y ordenar de forma aleatoria ambos lados de la unión. La conversión adaptativa de uniones amplía la gama de casos en los que Spark realiza automáticamente operaciones broadcast-hash-joins.
Esta característica está habilitada de forma predeterminada. Para deshabilitarla, puede establecer spark.sql.adaptive.enabled
en false
, lo que también deshabilita el marco de ejecución de consultas adaptativas. Spark decide convertir una operación sort-merge-join en una operación broadcast-hash-join cuando la estadística del tamaño del tiempo de ejecución de uno de los lados de la unión no supera el valor de spark.sql.autoBroadcastJoinThreshold
, que adopta 10 485 760 bytes (10 MiB) como valor predeterminado.
Fusión adaptativa de particiones aleatorias
La fusión adaptativa de particiones aleatorias mejora el rendimiento de las consultas al fusionar pequeñas particiones aleatorias contiguas para evitar la sobrecarga que supone tener demasiadas tareas pequeñas. Esto le permite configurar un mayor número de particiones aleatorias iniciales por adelantado, que luego se reduce en tiempo de ejecución hasta el tamaño deseado, lo que aumenta las posibilidades de tener particiones aleatorias distribuidas de manera más uniforme.
Esta característica está habilitada de forma predeterminada a menos que spark.sql.shuffle.partitions
se establezca de forma explícita. Para habilitarla, se puede establecer spark.sql.adaptive.coalescePartitions.enabled
en true
. Tanto el número inicial de particiones aleatorias como el tamaño de las particiones de destino se pueden ajustar con las propiedades spark.sql.adaptive.coalescePartitions.minPartitionNum
y spark.sql.adaptive.advisoryPartitionSizeInBytes
, respectivamente. Consulta la siguiente tabla para obtener más información sobre las propiedades de Spark relacionadas con esta característica.
Propiedades de las particiones de la fusión adaptativa de Spark | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Propiedad | Valor predeterminado | Descripción | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
true, a menos que |
Si el valor es true y spark.sql.adaptive.enabled es true, Spark fusiona las particiones aleatorias contiguas según el tamaño de destino (especificado mediante |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
64 MB |
Tamaño recomendado en bytes de la partición aleatoria al fusionarse. Esta configuración solo tiene efecto cuando |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
25 |
Número mínimo de particiones aleatorias después de la fusión. Esta configuración solo tiene efecto cuando |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
1 000 |
Número inicial de particiones aleatorias antes de la fusión. Esta configuración solo tiene efecto cuando |
Eliminación dinámica de particiones
La reducción dinámica de particiones mejora el rendimiento seleccionando con mayor precisión particiones específicas de una tabla que deben leerse y procesarse para una consulta específica. Al reducir la cantidad de datos leídos y procesados, se ahorra bastante tiempo en la ejecución de trabajos. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.24.0 y 5.25.0, puede establecer la propiedad spark.sql.dynamicPartitionPruning.enabled
de Spark desde Spark o al crear clústeres para habilitar esta característica.
Propiedades de las particiones de la eliminación dinámica de particiones de Spark | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Propiedad | Valor predeterminado | Descripción | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
Si es true, habilite la eliminación dinámica de particiones. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
Si es |
Esta optimización mejora las capacidades existentes de Spark 2.4.2, que solo admiten la postergación de predicados estáticos que se pueden resolver durante la planificación.
A continuación, se muestran ejemplos de postergación de predicados estáticos en Spark 2.4.2.
partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3
La reducción dinámica de particiones permite que el motor de Spark deduzca de forma dinámica durante el tiempo de ejecución qué particiones deben leerse y cuáles pueden eliminarse de forma segura. Por ejemplo, la siguiente consulta implica dos tablas: la tabla store_sales
que contiene todas las ventas totales para todas las tiendas y se particiona por región y la tabla store_regions
que contiene un mapeo de las regiones de cada país. Las tablas contienen datos sobre tiendas distribuidas por todo el mundo, pero solo realizan consultas en los datos de América del Norte.
select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'
Sin la reducción de particiones dinámicas, esta consulta leerá todas las regiones antes de filtrar el subconjunto de regiones que coinciden con los resultados de la subconsulta. Con la reducción dinámica de particiones, esta consulta leerá y procesará solo las particiones de las regiones que se devuelven en la subconsulta. Esto ahorra tiempo y recursos ya que lee menos datos del almacenamiento y procesa menos registros.
Aplanamiento de subconsultas escalares
Esta optimización mejora el rendimiento de las consultas que tienen subconsultas escalares a través de la misma tabla. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.24.0 y 5.25.0, puede establecer la propiedad spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled
de Spark desde Spark o al crear clústeres para habilitarlo. Cuando esta propiedad se establece en true, el optimizador de consultas aplana subconsultas escalares de agregación que utilizan la misma relación en caso posible. Las consultas escalares se aplanan empujando cualquier predicado presente en la subconsulta en las funciones de agregación y, a continuación, realiza una agregación, con todas las funciones de agregación, por relación.
A continuación se presenta un ejemplo de una consulta que se beneficiará de esta optimización.
select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3
La optimización vuelve a escribir la consulta anterior como:
select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);
Tenga en cuenta que la consulta reescrita lee la tabla solo una vez, y los predicados de las tres subconsultas se envían a la función avg
.
DISTINCT antes de INTERSECT
Esta optimización optimiza uniones cuando se utiliza INTERSECT. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.24.0 y 5.25.0, puede establecer la propiedad spark.sql.optimizer.distinctBeforeIntersect.enabled
de Spark desde Spark o al crear clústeres para habilitarlo. Las consultas con INTERSECT se convierten automáticamente para utilizar una semiunión pendiente. Cuando esta propiedad se establece en true, el optimizador de consultas inserta el operador DISTINCT en los elementos secundarios de INTERSECT si detecta que el operador DISTINCT puede convertir a la semiunión pendiente en una BroadcastHashJoin en lugar de una SortMergeJoin.
A continuación se presenta un ejemplo de una consulta que se beneficiará de esta optimización.
(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)
Sin habilitar esta propiedad spark.sql.optimizer.distinctBeforeIntersect.enabled
, la consulta se reescribirá tal y como se indica a continuación.
select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand
Cuando se habilita esta propiedad spark.sql.optimizer.distinctBeforeIntersect.enabled
, la consulta se reescribirá tal y como se indica a continuación.
select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand
Unión de filtros de Bloom
Esta optimización puede mejorar el rendimiento de algunas uniones efectuando un filtrado previo de un lado de una unión mediante un filtro de Bloomspark.sql.bloomFilterJoin.enabled
en true
desde Spark o al crear clústeres para habilitar esta característica.
A continuación se muestra un ejemplo de consulta que puede beneficiarse de un filtro de Bloom.
select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)
Cuando esta característica está habilitada, el filtro de Bloom se crea a partir de todos los identificadores de elemento cuya categoría se encuentra en el conjunto de categorías que se están consultando. Durante el análisis de la tabla de ventas, el filtro de Bloom se utiliza para determinar qué ventas corresponden a artículos que definitivamente no están en el conjunto definido por el filtro de Bloom. De este modo, estas ventas identificadas pueden excluirse lo antes posible.
Reordenación de unión optimizada
Esta optimización puede mejorar el rendimiento de las consultas al reordenar las uniones en las que intervienen tablas con filtros. Con Amazon EMR 5.26.0, esta característica está habilitada de forma predeterminada. Con Amazon EMR 5.25.0, puede establecer el parámetro de configuración spark.sql.optimizer.sizeBasedJoinReorder.enabled
de Spark en true para habilitar esta característica. El comportamiento predeterminado en Spark es unir las tablas de izquierda a derecha, como se indica en la consulta. Esta estrategia puede perder oportunidades para ejecutar primero uniones más pequeñas con filtros, con el fin de beneficiarse de uniones más costosas más tarde.
La consulta de ejemplo que aparece a continuación informa de todos los artículos devueltos de todas las tiendas de un país. Sin la reordenación de unión optimizada, Spark une primero las dos tablas grandes store_sales
y store_returns
; a continuación, las une con store
y, finalmente, con item
.
select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'
Con la reordenación de unión optimizada, Spark primero une store_sales
con store
, ya que store
tiene un filtro y es más pequeño que store_returns
y broadcastable
. Luego Spark se une con store_returns
y, finalmente, con item
. Si item
tuviera un filtro y se pudiera emitir, también se podría usar en la reordenación, lo que provocaría que store_sales
se uniera con store
, después con item
y, finalmente, con store_returns
.