Ottimizzazione delle prestazioni di Spark - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Ottimizzazione delle prestazioni di Spark

Amazon EMR offre diverse funzionalità di ottimizzazione delle prestazioni per Spark. In questo argomento viene descritta in dettaglio ogni caratteristica di ottimizzazione.

Per ulteriori informazioni su come impostare la configurazione Spark, consulta Configurazione di Spark.

Esecuzione adattiva delle query

L'esecuzione adattiva delle query è un framework per riottimizzare i piani di query in base alle statistiche di runtime. A partire da Amazon EMR 5.30.0, le seguenti ottimizzazioni adattive dell'esecuzione delle query di Apache Spark 3 sono disponibili su Apache Amazon Runtime for Spark 2. EMR

  • Conversione adattiva di join

  • Unione adattiva delle partizioni shuffle

Conversione adattiva di join

La conversione adattiva dei join migliora le prestazioni delle query convertendo le operazioni in sort-merge-join operazioni basate sulle dimensioni di runtime delle fasi di query. broadcast-hash-joins roadcast-hash-joins I B tendono a funzionare meglio quando un lato del join è sufficientemente piccolo da trasmettere in modo efficiente il relativo output a tutti gli executor, evitando così la necessità di mescolare, scambiare e ordinare entrambi i lati del join. La conversione adattiva dei join amplia la gamma di casi in cui Spark funziona automaticamente. broadcast-hash-joins

Questa caratteristica viene attivata per impostazione predefinita. Può essere disabilitata impostando spark.sql.adaptive.enabled su false, operazione che disabilita anche il framework di esecuzione adattiva delle query. Spark decide di convertire a in sort-merge-join a broadcast-hash-join quando la statistica della dimensione di runtime di uno dei lati di unione non superaspark.sql.autoBroadcastJoinThreshold, che per impostazione predefinita è 10.485.760 byte (10 MiB).

Unione adattiva delle partizioni shuffle

L'unione adattiva delle partizioni shuffle migliora le prestazioni delle query unendo piccole partizioni shuffle contigue per evitare il sovraccarico dato da un numero eccessivo di attività minori. Ciò consente di configurare un numero più elevato di partizioni shuffle iniziali che poi vengono ridotte in fase di runtime a una dimensione mirata, migliorando le possibilità di avere partizioni shuffle distribuite in modo più uniforme.

Questa funzionalità è attivata per impostazione predefinita, a meno che spark.sql.shuffle.partitions non sia impostato in modo esplicito. Può essere abilitata impostando spark.sql.adaptive.coalescePartitions.enabled su true. Sia il numero iniziale di partizioni shuffle che la dimensione delle partizioni di destinazione possono essere regolati utilizzando rispettivamente le proprietà spark.sql.adaptive.coalescePartitions.minPartitionNum e spark.sql.adaptive.advisoryPartitionSizeInBytes. Per ulteriori dettagli sulle proprietà Spark correlate a questa caratteristica, consulta la tabella riportata di seguito.

Proprietà della partizione di unione adattiva Spark
Proprietà Valore predefinito Descrizione

spark.sql.adaptive.coalescePartitions.enabled

true, a meno che spark.sql.shuffle.partitions non sia impostato in modo esplicito

Quando true e spark.sql.adaptive.enabled sono impostati su true, Spark unisce le partizioni shuffle contigue in base alla dimensione di destinazione (specificata da spark.sql.adaptive.advisoryPartitionSizeInBytes), in modo da evitare l'accumulo di attività minori.

spark.sql.adaptive.advisoryPartitionSizeInBytes

64 MB

La dimensione degli advisory in byte della partizione shuffle durante l'unione. Questa configurazione ha effetto solo quando spark.sql.adaptive.enabled e spark.sql.adaptive.coalescePartitions.enabled sono entrambi true.

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

Il numero minimo di partizioni shuffle dopo l'unione. Questa configurazione ha effetto solo quando spark.sql.adaptive.enabled e spark.sql.adaptive.coalescePartitions.enabled sono entrambi true.

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1000

Il numero iniziale di partizioni shuffle prima dell'unione. Questa configurazione ha effetto solo quando spark.sql.adaptive.enabled e spark.sql.adaptive.coalescePartitions.enabled sono entrambi true.

Eliminazione delle partizioni dinamiche

L'eliminazione delle partizioni dinamiche migliora le prestazioni del processo selezionando accuratamente partizioni specifiche all'interno di una tabella che devono essere lette ed elaborate per una query specifica. La riduzione della quantità di dati letti ed elaborati determina un risparmio di tempo significativo nell'esecuzione del processo. Con Amazon EMR 5.26.0, questa funzionalità è abilitata per impostazione predefinita. Con Amazon EMR 5.24.0 e 5.25.0, puoi abilitare questa funzionalità impostando la proprietà Spark spark.sql.dynamicPartitionPruning.enabled dall'interno di Spark o durante la creazione di cluster.

Proprietà della partizione di eliminazione dinamica di partizioni Spark
Proprietà Valore predefinito Descrizione

spark.sql.dynamicPartitionPruning.enabled

true

Quando è true, abilitare l'eliminazione dinamica delle partizioni.

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

Quando è true, Spark esegue un controllo difensivo prima dell'esecuzione delle query per garantire che il riutilizzo degli scambi broadcast nei filtri di eliminazione dinamica non venga interrotto da regole di preparazione successive, ad esempio regole colonnari definite dall'utente. Quando il riutilizzo è interrotto e questa configurazione è true, Spark rimuove i filtri di eliminazione dinamici interessati per evitare problemi di prestazioni e correttezza. I problemi di correttezza possono sorgere quando lo scambio broadcast del filtro di eliminazione dinamica produce risultati diversi e incoerenti dallo scambio broadcast dell'operazione di join corrispondente. L'impostazione di questa configurazione su false deve essere eseguita con cautela, in quanto consente di aggirare gli scenari, ad esempio quando il riutilizzo viene interrotto da regole colonnari definite dall'utente. Quando Adaptive Query Execution (Esecuzione adattiva delle query) è abilitata, il riutilizzo broadcast viene sempre applicato.

Questa ottimizzazione migliora le funzionalità esistenti di Spark 2.4.2, che supportano solo la distribuzione dei predicati che possono essere risolti in fase di pianificazione.

Di seguito sono elencati alcuni esempi di distribuzione di predicati statici in Spark 2.4.2.

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

L'eliminazione delle partizioni dinamiche consente al motore Spark di dedurre dinamicamente in fase di runtime quali partizioni devono essere lette e quali possono essere eliminate. Ad esempio, la seguente query comporta due tabelle: la tabella store_sales contenente tutte le vendite totali di tutti gli store, partizionata in base alla regione, e la tabella store_regions contenente una mappatura di regioni per ogni paese. Le tabelle contengono i dati relativi agli store distribuiti in tutto il mondo, ma vengono eseguite query solo per il Nord America.

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

Senza l'eliminazione delle partizioni dinamiche, questa query leggerà tutte le regioni prima di filtrare il sottoinsieme di regioni che corrispondono ai risultati della subquery. Con l'eliminazione delle partizioni dinamiche, questa query legge ed elabora solo le partizioni per le regioni restituite nella subquery. Questo consente di risparmiare tempo e risorse leggendo meno dati dallo storage ed elaborando meno record.

Appiattimento delle subquery scalari

Questa ottimizzazione migliora le prestazioni di query che dispongono di subquery scalari sulla stessa tabella. Con Amazon EMR 5.26.0, questa funzionalità è abilitata per impostazione predefinita. Con Amazon EMR 5.24.0 e 5.25.0, puoi abilitarlo impostando la proprietà Spark spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled dall'interno di Spark o durante la creazione di cluster. Quando questa proprietà è impostata su true, l'ottimizzatore di query appiattisce le subquery scalari accumulate che utilizzano la stessa relazione se possibile. Le subquery scalari vengono appiattite distribuendo gli eventuali predicati presenti nella subquery nelle funzioni di aggregazione e quindi eseguendo una aggregazione, con tutte le funzioni di aggregazione, per relazione.

Di seguito è riportato un esempio di query che sfrutta questa ottimizzazione.

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

L'ottimizzazione riscrive la query precedente come:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

Nota che la query riscritta legge la tabella studenti una sola volta e i predicati delle tre subquery vengono distribuiti nella funzione avg.

DISTINCTprima INTERSECT

Questa ottimizzazione ottimizza i join durante l'utilizzo. INTERSECT Con Amazon EMR 5.26.0, questa funzionalità è abilitata per impostazione predefinita. Con Amazon EMR 5.24.0 e 5.25.0, puoi abilitarlo impostando la proprietà Spark spark.sql.optimizer.distinctBeforeIntersect.enabled dall'interno di Spark o durante la creazione di cluster. Le query utilizzate INTERSECT vengono convertite automaticamente per utilizzare un join left-semi. Quando questa proprietà è impostata su true, l'ottimizzatore di query invia l'DISTINCToperatore ai figli INTERSECT se rileva che l'DISTINCToperatore può creare il left-semi join a invece di un. BroadcastHashJoin SortMergeJoin

Di seguito è riportato un esempio di query che sfrutta questa ottimizzazione.

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

Senza l'abilitazione di questa proprietà spark.sql.optimizer.distinctBeforeIntersect.enabled, la query sarà riscritta come segue.

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Quando abiliti questa proprietà spark.sql.optimizer.distinctBeforeIntersect.enabled, la query sarà riscritta come segue.

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Join del filtro Bloom

Questa ottimizzazione può migliorare le prestazioni di alcuni join mediante il prefiltraggio di un lato di un join utilizzando un filtro Bloom generato dal valori dell'altro lato del join. Con Amazon EMR 5.26.0, questa funzionalità è abilitata per impostazione predefinita. Con Amazon EMR 5.25.0, puoi abilitare questa funzionalità impostando la proprietà spark.sql.bloomFilterJoin.enabled Spark all'interno di Spark o true durante la creazione di cluster.

Di seguito è riportato un esempio di query che può trarre vantaggio da un filtro Bloom.

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

Quando questa caratteristica è abilitata, il filtro Bloom viene creato da tutti gli ID degli elementi la cui categoria è presente nel set delle categorie su cui viene eseguita la query. Durante la scansione della tabella SALES, il filtro Bloom viene utilizzato per determinare quali sono le vendite relative a elementi che non sono presenti nel set definito dal filtro Bloom. Pertanto, tali vendite identificate possono essere escluse dal filtro il prima possibile.

Riordinamento ottimizzato dei join

Questa ottimizzazione può migliorare le prestazioni delle query riordinando i join che includono tabelle con filtri. Con Amazon EMR 5.26.0, questa funzionalità è abilitata per impostazione predefinita. Con Amazon EMR 5.25.0, puoi abilitare questa funzionalità impostando il parametro spark.sql.optimizer.sizeBasedJoinReorder.enabled di configurazione Spark su true. Il comportamento predefinito in Spark è di eseguire il join delle tabelle da sinistra a destra, come elencato nella query. Questa strategia potrebbe non dare l'opportunità di eseguire prima i join più piccoli con i filtri, in modo da trarre vantaggio per join più onerosi in un momento successivo.

La seguente query di esempio segnala tutti gli elementi restituiti da tutti gli store in un paese. Senza il riordinamento ottimizzato dei join, Spark esegue prima il join delle due tabelle di grandi dimensioni, store_sales e store_returns, quindi ne esegue il join con store e infine con item.

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

Con il riordinamento ottimizzato dei join, Spark esegue il join di store_sales con store per prima cosa poiché store ha un filtro ed è inferiore a store_returns e a broadcastable. Quindi Spark esegue il join con store_returns e infine con item. Se item avesse avuto un filtro e fosse stato trasmissibile, sarebbe stato qualificato per il riordinamento, con il risultato di un join di store_sales con store, quindi con item e infine con store_returns.