Ottimizzazione dei tempi di riavvio dei processi per le operazioni di ripristino delle attività e dimensionamento - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Ottimizzazione dei tempi di riavvio dei processi per le operazioni di ripristino delle attività e dimensionamento

Quando un'attività non riesce o quando si verifica un'operazione di dimensionamento, Flink tenta di rieseguire l'attività dall'ultimo checkpoint completato. L'esecuzione del processo di riavvio potrebbe richiedere un minuto o più, a seconda delle dimensioni dello stato del checkpoint e del numero di attività parallele. Durante il periodo di riavvio, le attività di backlog relative al processo possono accumularsi. Esistono tuttavia alcuni modi in cui Flink ottimizza la velocità di ripristino e riavvio dei grafici di esecuzione per migliorare la stabilità del processo.

Questa pagina descrive alcuni dei modi in cui Amazon EMR Flink può migliorare il tempo di riavvio del processo durante le operazioni di ripristino delle attività o dimensionamento.

Nota

Il ripristino locale delle attività è supportato con Amazon EMR 6.0.0 e versioni successive.

Con i checkpoint Flink, ogni attività produce uno snapshot del suo stato che Flink scrive in un'archiviazione distribuita come Amazon S3. In caso di ripristino, le attività recuperano il loro stato dall'archiviazione distribuita. L'archiviazione distribuita offre tolleranza ai guasti e può ridistribuire lo stato durante il dimensionamento perché è accessibile a tutti i nodi.

Tuttavia, un archivio distribuito remoto presenta anche uno svantaggio: tutte le attività devono leggere il proprio stato da una posizione remota della rete. Ciò può comportare lunghi tempi di ripristino per stati di grandi dimensioni durante le operazioni di ripristino delle attività o di dimensionamento.

Il problema dei lunghi tempi di ripristino viene risolto mediante il ripristino locale delle attività. Le attività scrivono il loro stato su checkpoint in una memoria secondaria locale all'attività, ad esempio su un disco locale. Inoltre, memorizzano il loro stato nell'archiviazione principale, o su Amazon S3, come nel nostro caso. Durante il ripristino, il pianificatore programma le attività sullo stesso gestore delle attività in cui le attività erano state eseguite in precedenza, in modo che possano essere ripristinate dall'archivio di stato locale anziché essere lette dall'archivio di stato remoto. Per ulteriori informazioni, consulta l'argomento relativo al ripristino locale delle attività nella documentazione di Apache Flink.

I nostri test di benchmark con processi di esempio hanno dimostrato che il tempo di ripristino è stato ridotto da pochi minuti a pochi secondi con il ripristino locale delle attività abilitato.

Per abilitare il ripristino locale delle attività, imposta le seguenti configurazioni nel file flink-conf.yaml. Specifica il valore dell'intervallo di checkpoint in millisecondi.

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint execution.checkpointing.interval: 15000
Nota

Il checkpoint incrementale generico basato su log è supportato con Amazon EMR 6.10.0 e versioni successive.

Il checkpoint incrementale generico basato su log è stato aggiunto in Flink 1.16 per migliorare la velocità dei checkpoint. Un intervallo di checkpoint più rapido spesso comporta una riduzione del lavoro di ripristino perché è necessario rielaborare un minor numero di eventi dopo il ripristino. Per ulteriori informazioni, consulta Improving speed and stability of checkpointing with generic log-based incremental checkpoints sul blog di Apache Flink.

Con processi di esempio, i nostri test di benchmark hanno dimostrato che il tempo di checkpoint si è ridotto da pochi minuti a pochi secondi con il checkpoint incrementale generico basato su log.

Per abilitare i checkpoint incrementali generici basati su log, imposta le seguenti configurazioni nel tuo file flink-conf.yaml. Specifica il valore dell'intervallo di checkpoint in millisecondi.

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
Nota

Il supporto per il ripristino granulare nel pianificatore predefinito è supportato con Amazon EMR 6.0.0 e versioni successive. Il supporto per il ripristino granulare nel pianificatore adattivo è disponibile con Amazon EMR 6.15.0 e versioni successive.

Quando un'attività riporta un errore durante l'esecuzione, Flink reimposta l'intero grafico di esecuzione e attiva una riesecuzione completa dall'ultimo checkpoint completato. Questa procedura è più costosa della semplice riesecuzione delle attività non riuscite. Il ripristino granulare riavvia solo il componente connesso alla pipeline dell'attività non riuscita. Nell'esempio seguente, il grafico del processo ha 5 vertici (da A a E). Tutte le connessioni tra i vertici avvengono tramite pipeline con distribuzione uniforme e il comando parallelism.default per il processo è impostato su 2.

A → B → C → D → E

Per questo esempio, le attività totali in esecuzione sono 10. La prima pipeline (da a1 a e1) viene eseguita su un TaskManager (TM1) e la seconda pipeline (da a2 a e2) viene eseguita su un altro TaskManager (TM2).

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

Esistono due componenti collegati tramite pipeline: a1 → e1 e a2 → e2. Se TM1 o TM2 restituiscono un errore, l'errore influisce solo sulle 5 attività della pipeline in cui TaskManager era in esecuzione. La strategia di riavvio avvia solo il componente della pipeline interessato.

Il ripristino granulare funziona solo con processi Flink perfettamente paralleli. Non è supportato con le operazioni keyBy() o redistribute(). Per ulteriori informazioni, consulta FLIP-1: Fine Grained Recovery from Task Failures nel progetto Jira Flink Improvement Proposal.

Per abilitare il ripristino granulare, imposta le seguenti configurazioni nel file flink-conf.yaml.

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
Nota

Il meccanismo di riavvio combinato nel pianificatore adattivo è supportato con Amazon EMR 6.15.0 e versioni successive.

Il pianificatore adattivo può regolare il parallelismo del processo in base agli slot disponibili. Riduce automaticamente il parallelismo se non sono disponibili abbastanza slot per soddisfare il parallelismo configurato del processo. Se diventano disponibili nuovi slot, il processo viene nuovamente dimensionato in base al parallelismo configurato del processo. Un pianificatore adattivo evita i tempi di inattività del processo quando le risorse disponibili non sono sufficienti. Questo è il pianificatore supportato per Autoscaler di Flink. Per questi motivi, con Flink di Amazon EMR consigliamo il pianificatore adattivo. Tuttavia, i pianificatori adattivi potrebbero eseguire più riavvii in un breve periodo di tempo, un riavvio per ogni nuova risorsa aggiunta. Questo potrebbe comportare un calo delle prestazioni nel processo.

Con Amazon EMR 6.15.0 e versioni successive, Flink dispone di un meccanismo di riavvio combinato nel pianificatore adattivo che apre una finestra di riavvio quando viene aggiunta la prima risorsa e quindi attende fino all'intervallo di finestra configurato di 1 minuto predefinito. Esegue un singolo riavvio quando sono disponibili risorse sufficienti per eseguire il processo con il parallelismo configurato o quando scade l'intervallo.

Con processi di esempio, i nostri test di benchmark hanno dimostrato che questa funzionalità elabora il 10% dei record in più rispetto al comportamento predefinito quando si utilizzano pianificatori adattivi e autoscaler di Flink.

Per abilitare il meccanismo di riavvio combinato, imposta le seguenti configurazioni nel file flink-conf.yaml.

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m