Potatura delle partizioni dinamiche - AWS Guida prescrittiva

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Potatura delle partizioni dinamiche

Spark 3.0 e versioni successive includono Dynamic Partition Pruning (DPP). La potatura dinamica delle partizioni è una tecnica di ottimizzazione in Spark che impedisce la scansione di partizioni non necessarie durante la lettura dei dati. Di seguito sono riportate alcune cose fondamentali da sapere su DPP:

  • Esamina i valori di partizione richiesti nei filtri e nei predicati di interrogazione e determina quali partizioni sono necessarie per soddisfare la query. Tutte le partizioni ritenute non necessarie vengono eliminate automaticamente e in modo trasparente.

  • DPP riduce i tempi di elaborazione e l'utilizzo delle risorse ignorando le partizioni che non contengono dati applicabili. Questo aiuta Spark concentrarsi solo sulle partizioni pertinenti.

  • Funziona sia con partizioni statiche che con partizioni generate dinamicamente che vengono aggiunte tramite inserimenti o carichi incrementali. Spark riconosce nuove partizioni e può continuare ad applicare la potatura dinamica.

  • DPP è completamente trasparente o invisibile per gli sviluppatori. Non è necessaria alcuna codifica speciale per abilitare DPP. Si verifica automaticamente dietro le quinte come ottimizzazione durante la generazione del piano di query.

Di seguito sono riportate alcune best practice per garantire che DPP funzioni in modo efficiente:

  • Utilizza il predicate pushdown applicando i filtri nelle prime fasi del Spark operazioni sui frame di dati. Questo aiuta Spark per eliminare le partizioni in anticipo utilizzando i metadati delle partizioni.

  • Raccogli statistiche sui tuoi dati eseguendoli frequentemente. ANALYZE TABLE Ciò riduce le statistiche a livello di colonna che aiutano Spark per determinare con maggiore precisione quali partizioni possono essere ignorate.

  • Evita di partizionare eccessivamente i dati. Troppe partizioni possono sovraccaricare il nodo driver quando raccoglie le statistiche. Cerca di creare 10—100 partizioni per ogni tabella grande.

  • Ripartiziona i frame di dati prima delle unioni. In questo modo si evitano i shuffle join che richiedono lo spostamento di tutti i dati e si ottimizza ulteriormente la quantità di dati letti.

  • Utilizza tipi di colonne di partizione e denominazione coerenti tra le diverse tabelle da unire. Questo aiuta Spark abbina meglio le partizioni per l'ottimizzazione delle giunzioni.

  • Prova le query con EXPLAIN per assicurarti che il DPP venga applicato e verifica se è necessaria un'ulteriore ottimizzazione.

In uno schema a stella, le tabelle sono suddivise in due tipi principali: tabelle dei fatti e tabelle delle dimensioni. Le tabelle dimensionali tendono ad essere molto più piccole delle tabelle dei fatti. Quando si unisce una tabella dei fatti a una tabella delle dimensioni, DPP ottimizza il piano di interrogazione. Crea una sottoquery da tutti i filtri applicati alla tabella delle dimensioni. Trasmette questa sottoquery e ne crea una tabella hash. Quindi applica la tabella hash alla fase di scansione della tabella dei fatti, prima di leggere i dati della tabella dei fatti. Questo aiuta DPP a ridurre la quantità di dati che devono essere letti dalla tabella dei fatti più ampia.

La seguente query di esempio mostra DPP in azione. La query recupera il numero di ordini dal paese (India) e include un'unione interna tra una tabella dei fatti (fact_orders) e una tabella delle dimensioni (nation). La fact_orders tabella è partizionata in base alla colonna. o_nationkey

- "select n.n_name as country, count(1) as no_of_orders from fact_orders o join nation n on o.o_nationkey = n.n_nationkey where n.n_name = 'INDIA' group by n.n_name"

Di seguito sono riportati i passaggi utilizzati nel EXPLAIN piano:

  1. Scansiona la tabella delle dimensioni più piccole (nation) e filtra per colonnan_name = 'INDIA'.

  2. Trasmetti i risultati del passaggio precedente.

  3. Crea una sottoquery che filtra i risultati del primo passaggio.

  4. Premila verso il basso PartitionFilter in modo che esegua la scansione solo delle partizioni della tabella dei fatti necessarie, invece di eseguire una scansione completa della tabella.

Di seguito è riportato il EXPLAIN piano per questa interrogazione ottimizzata per DPP.

== Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#208L]) +- ShuffleQueryStage 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#353] +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#212L]) +- *(3) HashAggregate(keys=[n_name#31], functions=[], output=[]) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(n_name#31, 36), ENSURE_REQUIREMENTS, [id=#315] +- *(2) HashAggregate(keys=[n_name#31], functions=[], output=[n_name#31]) +- *(2) Project [n_name#31] +- *(2) BroadcastHashJoin [cast(o_nationkey#145 as bigint)], [n_nationkey#32L], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [o_nationkey#145] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3://aws-spark-tuning/fact_orders], PartitionFilters: [isnotnull(o_nationkey#145), dynamicpruningexpression(cast(o_nationkey#145 as bigint) IN dynamicp..., PushedFilters: [], ReadSchema: struct<> : +- SubqueryBroadcast dynamicpruning#210, 0, [n_nationkey#32L], [id=#200] : +- OutputAdapter [n_name#31, n_nationkey#32L] : +- AdaptiveSparkPlan isFinalPlan=true : +- BroadcastQueryStage 2 : +- ReusedExchange [n_name#31, n_nationkey#32L], BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- BroadcastQueryStage 0 +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- *(1) Filter ((isnotnull(n_name#31) AND (n_name#31 = INDIA)) AND isnotnull(n_nationkey#32L)) +- FileScan json [n_name#31,n_nationkey#32L] Batched: false, DataFilters: [isnotnull(n_name#31), (n_name#31 = INDIA), isnotnull(n_nationkey#32L)], Format: JSON, Location: InMemoryFileIndex[s3://aws-spark-tuning/input/demo/json/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_name), EqualTo(n_name,INDIA), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,n_nationkey:bigint>

Anche se non è stato aggiunto alcun filtro diretto alla o_nationkey colonna, grazie alla funzionalità DPP, Spark esegue automaticamente la scansione solo delle partizioni necessarie, anziché dell'intera tabella.