Optimieren der Neustartzeiten von Aufträgen für die Aufgabenwiederherstellung und -skalierung - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Optimieren der Neustartzeiten von Aufträgen für die Aufgabenwiederherstellung und -skalierung

Wenn eine Aufgabe fehlschlägt oder wenn ein Skalierungsvorgang stattfindet, versucht Flink, die Aufgabe vom letzten abgeschlossenen Prüfpunkt aus erneut auszuführen. Die Ausführung des Neustartvorgangs kann eine Minute oder länger dauern, abhängig von der Größe des Prüfpunktzustands und der Anzahl der parallelen Aufgaben. Während des Neustarts können sich Backlog-Aufgaben für den Auftrag ansammeln. Es gibt jedoch einige Möglichkeiten, wie Flink die Geschwindigkeit der Wiederherstellung und des Neustarts von Ausführungsdiagrammen optimiert, um die Auftragsstabilität zu verbessern.

Auf dieser Seite werden einige der Möglichkeiten beschrieben, mit denen Amazon EMR Flink die Zeit für den Neustart des Auftrags während der Aufgabenwiederherstellung oder -skalierung verbessern kann.

Anmerkung

Aufgabenlokale Wiederherstellung wird mit Amazon EMR 6.0.0 und höher unterstützt.

Mit Flink-Prüfpunkten erstellt jede Aufgabe einen Snapshot ihres Status, den Flink in verteilte Speicher wie Amazon S3 schreibt. Im Falle einer Wiederherstellung stellen die Aufgaben ihren Status aus dem verteilten Speicher wieder her. Der verteilte Speicher bietet Fehlertoleranz und kann den Status während der Neuskalierung neu verteilen, da er für alle Knoten zugänglich ist.

Ein verteilter Remote-Speicher hat jedoch auch einen Nachteil: Alle Aufgaben müssen ihren Status von einem entfernten Standort aus über das Netzwerk lesen. Dies kann bei der Aufgabenwiederherstellung oder bei Skalierungsvorgängen zu langen Wiederherstellungszeiten für große Zustände führen.

Dieses Problem der langen Wiederherstellungszeit wird durch eine aufgabenlokale Wiederherstellung gelöst. Aufgaben schreiben ihren Status am Prüfüunkt in einen sekundären Speicher, der sich lokal zur Aufgabe befindet, z. B. auf eine lokale Festplatte. Sie speichern ihren Status auch im Primärspeicher oder in unserem Fall in Amazon S3. Während der Wiederherstellung plant der Scheduler die Aufgaben in demselben Task-Manager, in dem die Aufgaben zuvor ausgeführt wurden, sodass sie aus dem lokalen Statusspeicher wiederhergestellt werden können, anstatt sie aus dem Remote-Statusspeicher zu lesen. Weitere Informationen finden Sie unter Aufgabenlokale Wiederherstellung in der Apache-Flink-Dokumentation.

Unsere Benchmark-Tests mit Beispielaufträgen haben gezeigt, dass die Wiederherstellungszeit bei aktivierter aufgabenlokaler Wiederherstellung von Minuten auf wenige Sekunden reduziert wurde.

Um die aufgabenlokale Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer flink-conf.yaml-Datei fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.

state.backend.local-recovery: true state.backend: hasmap or rocksdb state.checkpoints.dir: s3://storage-location-bucket-path/checkpoint execution.checkpointing.interval: 15000
Anmerkung

Generische protokollbasierte inkrementelle Prüfpunkte werden mit Amazon EMR 6.10.0 und höher unterstützt.

Generische protokollbasierte inkrementelle Prüfpunkte wurden in Flink 1.16 hinzugefügt, um die Geschwindigkeit von Prüfpunkten zu verbessern. Ein schnelleres Prüfpunktintervall führt häufig zu einer Reduzierung des Wiederherstellungsaufwands, da weniger Ereignisse nach der Wiederherstellung erneut verarbeitet werden müssen. Weitere Informationen finden Sie im Apache-Flink-Blog unter Verbesserung der Geschwindigkeit und Stabilität von Prüfpunkten mit generischen protokollbasierten inkrementellen Prüfpunkten.

Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass sich die Prüfpunktzeit mit dem generischen protokollbasierten inkrementellen Prüfpunkt von Minuten auf wenige Sekunden reduziert hat.

Um generische protokollbasierte inkrementelle Prüfpunkte zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei flink-conf.yaml fest. Geben Sie den Wert für das Prüfpunkt-Intervall in Millisekunden an.

state.backend.changelog.enabled: true state.backend.changelog.storage: filesystem dstl.dfs.base-path: s3://bucket-path/changelog state.backend.local-recovery: true state.backend: rocksdb state.checkpoints.dir: s3://bucket-path/checkpoint execution.checkpointing.interval: 15000
Anmerkung

Eine differenzierte Wiederherstellungsunterstützung für den Standard-Scheduler ist mit Amazon EMR 6.0.0 und höher verfügbar. Eine differenzierte Wiederherstellungsunterstützung im adaptiven Scheduler ist mit Amazon EMR 6.15.0 und höher verfügbar.

Wenn eine Aufgabe während der Ausführung fehlschlägt, setzt Flink das gesamte Ausführungsdiagramm zurück und löst eine vollständige Neuausführung ab dem letzten abgeschlossenen Prüfpunkt aus. Das ist teurer, als nur die fehlgeschlagenen Aufgaben erneut auszuführen. Bei einer differenzierten Wiederherstellung wird nur die mit der Pipeline verbundene Komponente der fehlgeschlagenen Aufgabe neu gestartet. Im folgenden Beispiel hat das Auftragsdiagramm 5 Scheitelpunkte (A bis E). Alle Verbindungen zwischen den Scheitelpunkten werden punktweise in Pipelines verlegt, und der Wert parallelism.default für den Auftrag ist auf 2 eingestellt.

A → B → C → D → E

In diesem Beispiel werden insgesamt 10 Aufgaben ausgeführt. Die erste Pipeline (a1 bis e1) wird in einem TaskManager (TM1) ausgeführt, während die zweite Pipeline (a2 bis e2) in einem anderen TaskManager (TM2) ausgeführt wird.

a1 → b1 → c1 → d1 → e1 a2 → b2 → c2 → d2 → e2

Es gibt zwei Komponenten, die über eine Pipeline miteinander verbunden sind: a1 → e1 und a2 → e2. Wenn entweder TM1 oder TM2 fehlschlägt, wirkt sich der Fehler nur auf die 5 Aufgaben in der Pipeline aus, in denen der TaskManager ausgeführt wurde. Bei der Neustartstrategie wird nur die betroffene Pipeline-Komponente gestartet.

Eine differenzierte Wiederherstellung funktioniert nur mit perfekt parallelen Flink-Aufträgen. Sie wird nicht mit keyBy()- oder redistribute()-Vorgängen unterstützt. Weitere Informationen finden Sie unter FLIP-1: Fine Grained Recovery from Task Failures (FLIP-1: Differenzierte Wiederherstellung nach Aufgabenfehlern) im Jira-Projekt Flink Improvement Proposal.

Um die differenzierte Wiederherstellung zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer flink-conf.yaml-Datei fest.

jobmanager.execution.failover-strategy: region restart-strategy: exponential-delay or fixed-delay
Anmerkung

Der kombinierte Neustartmechanismus im adaptiven Scheduler wird mit Amazon EMR 6.15.0 und höher unterstützt.

Der adaptive Scheduler kann die Parallelität des Auftrags auf der Grundlage der verfügbaren Slots anpassen. Er reduziert automatisch die Parallelität, wenn nicht genügend Slots für die konfigurierte Auftragsparallelität verfügbar sind. Wenn neue Slots verfügbar werden, wird der Auftrag wieder auf die konfigurierte Auftragsparallelität hochskaliert. Ein adaptiver Scheduler vermeidet Ausfallzeiten beim Auftrag, wenn nicht genügend Ressourcen verfügbar sind. Dies ist der unterstützte Scheduler für Flink Autoscaler. Aus diesen Gründen empfehlen wir den adaptiven Scheduler mit Amazon EMR Flink. Adaptive Scheduler können jedoch innerhalb kurzer Zeit mehrere Neustarts durchführen, und zwar einen Neustart für jede neu hinzugefügte Ressource. Dies könnte zu einem Leistungsabfall des Auftrags führen.

Mit Amazon EMR 6.15.0 und höher verfügt Flink über einen kombinierten Neustartmechanismus im adaptiven Scheduler, der ein Neustartfenster öffnet, wenn die erste Ressource hinzugefügt wird, und dann bis zum konfigurierten Fensterintervall von 1 Minute wartet. Er führt einen einzigen Neustart durch, wenn genügend Ressourcen zur Verfügung stehen, um den Auftrag mit konfigurierter Parallelität auszuführen, oder wenn das Intervall abgelaufen ist.

Unsere Benchmark-Tests haben anhand von Beispielaufträgen gezeigt, dass dieses Feature 10 % mehr Datensätze verarbeitet als das Standardverhalten, wenn Sie den adaptiven Scheduler und Flink Autoscaler verwenden.

Um den kombinierten Neustartmechanismus zu aktivieren, legen Sie die folgenden Konfigurationen in Ihrer Datei flink-conf.yaml fest.

jobmanager.adaptive-scheduler.combined-restart.enabled: true jobmanager.adaptive-scheduler.combined-restart.window-interval: 1m