Caching dei frammenti dei risultati Spark
Amazon EMR 6.6.0 e versioni successive includono la funzione opzionale Caching dei frammenti dei risultati Spark che memorizza automaticamente nella cache i frammenti dei risultati. Questi frammenti di risultati sono parti dei risultati di sottoalberi di query archiviati in un bucket Amazon S3 di tua scelta. I frammenti dei risultati della query memorizzati vengono riutilizzati nelle successive esecuzioni di query, rilasciando query più veloci.
Il Caching dei frammenti dei risultati funziona analizzando le query Spark SQL e memorizzando nella cache i frammenti di risultati idonei nella posizione S3 specificata. Nelle successive esecuzioni di query, i frammenti dei risultati della query utilizzabili vengono rilevati automaticamente e recuperati da S3. Il Caching dei frammenti di risultato è diverso dal Caching dei set di risultati, in cui le query successive devono corrispondere esattamente alla query originale per restituire risultati dalla cache. Se utilizzato per query che si rivolgono ripetutamente a un sottoinsieme statico di dati, la memorizzazione nella cache dei frammenti di risultato accelera notevolmente le prestazioni.
Si consideri la seguente query, che conta gli ordini fino all'anno 2022:
select l_returnflag, l_linestatus, count(*) as count_order from lineitem where l_shipdate <= current_date and year(l_shipdate) == '2022' group by l_returnflag, l_linestatus
Con il passare del tempo, questa query deve essere eseguita ogni giorno per segnalare le vendite totali per l'anno. Senza Caching dei frammenti dei risultati, i risultati per tutti i giorni dell'anno dovranno essere ricalcolati ogni giorno. La query diventerà più lenta nel tempo e sarà più lenta alla fine dell'anno, quando tutti i 365 giorni di risultati dovranno essere ricalcolati.
Quando si attiva Caching dei frammenti dei risultati, si utilizzano i risultati di tutti i giorni precedenti dell'anno dalla cache. Ogni giorno, la funzione deve ricalcolare solo un giorno di risultati. Dopo che la funzione ha calcolato il frammento del risultato, questa memorizza nella cache il frammento. Di conseguenza, i tempi di query abilitati per la cache sono rapidi e rimangono costanti per ogni query successiva.
Abilitazione del Caching dei frammenti dei risultati Spark
Per abilitare la Caching dei frammenti dei risultati di Spark, segui la procedura seguente:
-
Crea un bucket di cache in Amazon S3 e autorizza l'accesso in lettura/scrittura per EMRFS. Per ulteriori informazioni, consulta Autorizzazione di accesso ai dati di EMRFS in Amazon S3.
-
Imposta la configurazione di EMR Spark per abilitare la funzione.
spark.subResultCache.enabled = true spark.subResultCache.fs.root.path = s3://DOC-EXAMPLE-BUCKET/cache_dir/
-
Abilita la gestione del ciclo di vita S3 per il bucket per pulire automaticamente i file della cache.
-
Facoltativamente, configura le proprietà reductionRationThreshold e maxBufferSize per ottimizzare ulteriormente la funzione.
spark.sql.subResultCache.reductionRatioThreshold spark.sql.subResultCache.maxBufferSize
Considerazioni su quando utilizzare il Caching dei frammenti
Il risparmio sui costi quando si utilizzano risultati già memorizzati nella cache in Amazon S3 anziché ricalcolarli aumenta con il numero di volte in cui è possibile utilizzare gli stessi risultati memorizzati nella cache. Le query con scansioni di tabelle di grandi dimensioni seguite da filtri o aggregazioni hash che riducono la dimensione del risultato di un fattore di almeno 8 (vale a dire, un rapporto di almeno 8:1 in dimensione di size:results) trarranno i maggiori vantaggi da questa funzione. Maggiore è il rapporto di riduzione tra input e risultati, maggiore è il rapporto costi-benefici. Ne trarranno vantaggio anche le query con rapporti di riduzione inferiori, ma che contengono passaggi di calcolo costosi tra la scansione della tabella e il filtro o le aggregazioni, purché il costo per produrre i risultati sia maggiore del costo per recuperarli da Amazon S3. Per impostazione predefinita, Caching dei frammenti dei risultati ha effetto solo quando rileva che un rapporto di riduzione sarà almeno di 8:1.
Quando le tue query riutilizzano ripetutamente i risultati memorizzati nella cache, i vantaggi di questa funzione sono maggiori. Le query a finestra continua e incrementali sono buoni esempi. Ad esempio, una query a finestra continua di 30 giorni che è già stata eseguita per 29 giorni, avrebbe solo bisogno di estrarre 1/30 dei dati di destinazione dalla sua origine di input originale e utilizzerebbe frammenti di risultati memorizzati nella cache per i 29 giorni precedenti. Una query a finestra incrementale ne trarrebbe un vantaggio ancora maggiore, poiché l'inizio della finestra rimane fisso: ad ogni invocazione della query, una percentuale minore dell'elaborazione richiederà la lettura dall'origine di input.
Di seguito sono riportate ulteriori considerazioni quando si utilizza il Caching dei frammenti dei risultati:
-
Le query che non hanno come target gli stessi dati con gli stessi frammenti di query avranno una percentuale di riscontri nella cache bassa, quindi non trarranno vantaggio da questa funzione.
-
Le query con rapporti di riduzione bassi che non contengono passaggi di calcolo costosi porteranno a risultati memorizzati nella cache che sono all'incirca tanto costosi da leggere quanto lo erano per l'elaborazione iniziale.
-
La prima query mostrerà sempre una regressione minore a causa del costo di scrittura nella cache.
-
La funzione di memorizzazione nella cache di frammenti risultanti funziona esclusivamente con i file Parquet. Altri formati di file non sono supportati.
-
I buffer della funzione di memorizzazione nella cache di frammenti risultanti tenteranno di memorizzare nella cache solo scansioni con dimensioni di suddivisione dei file pari o superiori a 128 MB. Con la configurazione Spark predefinita, il Caching dei frammenti dei risultati sarà disabilitato se la dimensione di scansione (dimensione totale di tutti i file sottoposti a scansione) divisa per il numero di core esecutori è inferiore a 128 MB. Quando viene impostata una delle configurazioni Spark elencate di seguito, la dimensione della divisione del file sarà:
min(maxPartitionBytes, max(openCostInBytes, scan size / minPartitionNum))
-
spark.sql.leafNodeDefaultParallelism (il valore predefinito è spark.default.parallelism)
-
Spark.sql.files.minPartitionNum (il valore predefinito è spark.sql.leafNodeDefaultParallelism)
-
spark.sql.files.openCostInBytes
-
spark.sql.files.maxPartitionBytes
-
-
La funzione di memorizzazione nella cache di frammenti risultanti memorizza nella cache la granularità della partizione RDD. Il rapporto di riduzione precedentemente descritto che per impostazione predefinita è 8:1 viene valutato per partizione RDD. I carichi di lavoro con rapporti di riduzione per RDD maggiori o inferiori a 8:1 possono riscontrare vantaggi in termini di prestazioni inferiori rispetto ai carichi di lavoro con rapporti di riduzione per RDD costantemente inferiori a 8:1.
-
La funzione di memorizzazione nella cache di frammenti risultanti utilizza un buffer di scrittura da 16 MB per impostazione predefinita per ogni partizione RDD che viene memorizzata nella cache. Se più di 16 MB verranno memorizzati nella cache per partizione RDD, il costo per determinare l'impossibilità di una scrittura può comportare una regressione delle prestazioni.
-
Mentre, per impostazione predefinita, il Caching dei frammenti dei risultati non tenterà di memorizzare nella cache i risultati della partizione RDD con un rapporto di riduzione inferiore a 8:1 e limiterà il suo buffer di scrittura a 16 MB; entrambi questi valori sono regolabili attraverso le seguenti configurazioni:
spark.sql.subResultCache.reductionRatioThreshold (default: 8.0) spark.sql.subResultCache.maxBufferSize (default: 16MB, max: 64MB)
-
Più cluster che utilizzano lo stesso rilascio di EMR possono condividere la stessa posizione della cache. Per garantire la correttezza dei risultati, la memorizzazione nella cache di frammenti risultanti non utilizzerà i risultati della cache scritti da rilasci differenti di Amazon EMR.
-
Il Caching dei frammenti dei risultati verrà disabilitato automaticamente per i casi d'uso di Spark Streaming o quando viene utilizzato RecordServer, Apache Ranger o AWS Lake Formation.
-
Le operazioni di lettura/scrittura nella cache dei frammenti del risultato utilizzano bucket EMRFS e Amazon S3. La crittografia CSE/ SSE S3/ SSE KMS è supportata.