Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Eliminar particiones dinámicas
Spark La versión 3.0 y las versiones posteriores incluyen Dynamic Partition Pruning (DPP). La poda dinámica de particiones es una técnica de optimización en Spark que impide escanear particiones innecesarias al leer datos. Los siguientes son algunos aspectos clave que debe conocer acerca de DPP:
-
Examina los valores de partición solicitados en los filtros y predicados de la consulta y determina qué particiones son necesarias para satisfacer la consulta. Las particiones que se consideren innecesarias se eliminan de forma automática y transparente.
-
DPP reduce el tiempo de procesamiento y la utilización de recursos al omitir las particiones que no contienen datos aplicables. Esto ayuda Spark centrarse únicamente en las particiones relevantes.
-
Funciona tanto con particiones estáticas como con particiones generadas dinámicamente que se añaden mediante inserciones o cargas incrementales. Spark reconoce las particiones nuevas y puede seguir aplicando la depuración dinámica.
-
DPP es completamente transparente o invisible para los desarrolladores. No se necesita ningún código especial para activar DPP. Se produce automáticamente entre bastidores como una optimización durante la generación del plan de consultas.
Las siguientes son algunas de las mejores prácticas para garantizar que DPP funcione de manera eficiente:
-
Utilice la función de pulsar predicados aplicando filtros al principio de su Spark operaciones con marcos de datos. Esto ayuda Spark para eliminar las particiones de forma temprana mediante los metadatos de las particiones.
-
Recopile estadísticas sobre sus datos ejecutándolos
ANALYZE TABLE
con frecuencia. Esto reduce las estadísticas a nivel de columna que ayudan Spark para determinar con mayor precisión qué particiones se pueden ignorar. -
Evite particionar sus datos en exceso. Demasiadas particiones pueden sobrecargar el nodo controlador a la hora de recopilar estadísticas. Intente utilizar de 10 a 100 particiones para cada tabla grande.
-
Reparticione los marcos de datos antes de las uniones. Esto evita las uniones aleatorias que requieren mover todos los datos y optimiza aún más la cantidad de datos que se leen.
-
Utilice tipos de columnas de partición y nombres coherentes en las diferentes tablas que vaya a unir. Esto ayuda Spark haga coincidir mejor las particiones para la optimización de las uniones.
-
Pruebe las consultas
EXPLAIN
para asegurarse de que se está aplicando el DPP y compruebe si es necesario realizar ajustes adicionales.
En un esquema en estrella, las tablas se dividen en dos tipos principales: tablas de hechos y tablas de dimensiones. Las tablas de dimensiones suelen ser mucho más pequeñas que las tablas de hechos. Al unir una tabla de hechos a una tabla de dimensiones, DPP optimiza el plan de consulta. Crea una subconsulta a partir de cualquier filtro que se aplique a la tabla de dimensiones. Transmite esta subconsulta y crea una tabla hash a partir de ella. A continuación, aplica la tabla hash a la fase de escaneo de la tabla de hechos, antes de leer los datos de la tabla de hechos. Esto ayuda a DPP a reducir la cantidad de datos que deben leerse de la tabla de hechos más grande.
La siguiente consulta de ejemplo muestra a DPP en acción. La consulta obtiene el número de pedidos del país (India) e incluye una combinación interna entre una tabla de datos (fact_orders
) y una tabla de dimensiones (nation
). La fact_orders
tabla está dividida por columnas. o_nationkey
- "select n.n_name as country, count(1) as no_of_orders from fact_orders o join nation n on o.o_nationkey = n.n_nationkey where n.n_name = 'INDIA' group by n.n_name"
Los siguientes son los pasos que se utilizan en el EXPLAIN
plan:
-
Escanee la tabla de dimensiones más pequeñas (
nation
) y filtre por columnan_name = 'INDIA'
. -
Transmita los resultados del paso anterior.
-
Cree una subconsulta que filtre los resultados del primer paso.
-
Empújelo hacia abajo
PartitionFilter
para que solo escanee las particiones de la tabla de hechos que sean necesarias, en lugar de escanear una tabla completa.
El siguiente es el EXPLAIN
plan para esta consulta optimizada para DPP.
== Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#208L]) +- ShuffleQueryStage 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#353] +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#212L]) +- *(3) HashAggregate(keys=[n_name#31], functions=[], output=[]) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(n_name#31, 36), ENSURE_REQUIREMENTS, [id=#315] +- *(2) HashAggregate(keys=[n_name#31], functions=[], output=[n_name#31]) +- *(2) Project [n_name#31] +- *(2) BroadcastHashJoin [cast(o_nationkey#145 as bigint)], [n_nationkey#32L], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [o_nationkey#145] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3://aws-spark-tuning/fact_orders], PartitionFilters: [isnotnull(o_nationkey#145), dynamicpruningexpression(cast(o_nationkey#145 as bigint) IN dynamicp..., PushedFilters: [], ReadSchema: struct<> : +- SubqueryBroadcast dynamicpruning#210, 0, [n_nationkey#32L], [id=#200] : +- OutputAdapter [n_name#31, n_nationkey#32L] : +- AdaptiveSparkPlan isFinalPlan=true : +- BroadcastQueryStage 2 : +- ReusedExchange [n_name#31, n_nationkey#32L], BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- BroadcastQueryStage 0 +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- *(1) Filter ((isnotnull(n_name#31) AND (n_name#31 = INDIA)) AND isnotnull(n_nationkey#32L)) +- FileScan json [n_name#31,n_nationkey#32L] Batched: false, DataFilters: [isnotnull(n_name#31), (n_name#31 = INDIA), isnotnull(n_nationkey#32L)], Format: JSON, Location: InMemoryFileIndex[s3://aws-spark-tuning/input/demo/json/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_name), EqualTo(n_name,INDIA), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,n_nationkey:bigint>
Aunque no se haya agregado ningún filtro directo a la o_nationkey
columna, debido a la función DPP, Spark escanea automáticamente solo las particiones que se necesitan, en lugar de analizar toda la tabla.