Verwenden der Hochverfügbarkeit (HA) für Flink-Operatoren und Flink-Anwendungen - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden der Hochverfügbarkeit (HA) für Flink-Operatoren und Flink-Anwendungen

Wir ermöglichen Hochverfügbarkeit für den Flink-Operator, sodass wir auf einen Standby-Flink-Operator umschalten können, um Ausfallzeiten im Regelkreis des Bedieners zu minimieren, falls Ausfälle auftreten. Hochverfügbarkeit ist standardmäßig aktiviert, und die Standardanzahl der Startoperatorreplikate ist 2. Sie können das Feld Replicas in Ihrer values.yaml-Datei für das Helm-Chart konfigurieren.

Die folgenden Felder sind anpassbar:

  • replicas (optional, Standardeinstellung ist 2): Wenn Sie diese Zahl auf einen Wert über 1 setzen, werden weitere Standby-Operatoren erstellt und Ihr Auftrag kann schneller wiederhergestellt werden.

  • highAvailabilityEnabled (optional, der Standardwert ist true): Steuert, ob Sie HA aktivieren möchten. Wenn Sie diesen Parameter auf true angeben, wird die Unterstützung für Multi-AZ-Bereitstellungen aktiviert und die richtigen flink-conf.yaml-Parameter festgelegt.

Sie können HA für Ihren Betreiber deaktivieren, indem Sie die folgende Konfiguration in Ihrer values.yaml-Datei festlegen.

... imagePullSecrets: [] replicas: 1 # set this to false if you don't want HA highAvailabilityEnabled: false ...

Multi-AZ-Bereitstellung

Wir erstellen die Operator-Pods in mehreren Availability Zones. Dabei handelt es sich um eine weiche Einschränkung, und Ihre Operator-Pods werden in derselben AZ geplant, falls Sie nicht über genügend Ressourcen in einer anderen AZ verfügen.

Bestimmung des Leader-Replikats

Wenn HA aktiviert ist, ermitteln die Replikate anhand eines Leases, welcher der JMs der Leader ist, und verwenden einen K8s-Lease für die Auswahl des Leaders. Sie können den Lease beschreiben und anhand des Felds .Spec.Holder Identity den aktuellen Leader ermitteln

kubectl describe lease <Helm Install Release Name>-<NAMESPACE>-lease -n <NAMESPACE> | grep "Holder Identity"

Flink-S3-Interaktion

Konfigurieren von Zugriffs-Anmeldeinformationen

Bitte stellen Sie sicher, dass Sie IRSA mit den entsprechenden IAM-Berechtigungen für den Zugriff auf den S3-Bucket konfiguriert haben.

Auftrag-Jars werden aus dem S3-Anwendungsmodus abgerufen

Der Flink-Operator unterstützt auch das Abrufen von Anwendungs-Jars aus S3. Sie geben einfach den S3-Speicherort für den jarURI in Ihrer FlinkDeployment Spezifikation an.

Sie können diese Funktion auch verwenden, um andere Artefakte wie PyFlink Skripte herunterzuladen. Das resultierende Python-Skript wird unter dem Pfad /opt/flink/usrlib/ abgelegt.

Das folgende Beispiel zeigt, wie Sie diese Funktion für einen PyFlink Auftrag verwenden. Beachten Sie die Felder jarURI und args.

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: image: <YOUR CUSTOM PYFLINK IMAGE> emrReleaseLabel: "emr-6.12.0-flink-latest" flinkVersion: v1_16 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" serviceAccount: flink jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: "s3://<S3-BUCKET>/scripts/pyflink.py" # Note, this will trigger the artifact download process entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-pyclientexec", "/usr/local/bin/python3", "-py", "/opt/flink/usrlib/pyflink.py"] parallelism: 1 upgradeMode: stateless

Flink-S3-Konnektoren

Flink wird mit zwei S3-Konnektoren geliefert (unten aufgeführt). In den folgenden Abschnitten wird erläutert, wann welcher Anschluss verwendet werden sollte.

Checkpointing: Presto-S3-Konnektoren

  • S3-Schema auf s3p://setzen

  • Der empfohlene Konnektor für den Checkpoint zu s3.

FlinkDeployment Beispielspezifikation:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" state.checkpoints.dir: s3p://<UCKET-NAME>/flink-checkpoint/
  • S3-Schema auf s3:// oder (s3a://) setzen

  • Der empfohlene Konnektor zum Lesen und Schreiben von Dateien aus S3 (einziger S3-Konnektor, der die Flinks-Dateisystemschnittstelle implementiert).

  • Standardmäßig legen wir fs.s3a.aws.credentials.provider in der flink-conf.yaml-Datei fest, die com.amazonaws.auth.WebIdentityTokenCredentialsProvider ist. Wenn Sie die Standardeinstellung flink-conf vollständig überschreiben und mit S3 interagieren, stellen Sie sicher, dass Sie diesen Anbieter verwenden.

FlinkDeployment Beispielspezifikation

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: job: jarURI: local:///opt/flink/examples/streaming/WordCount.jar args: [ "--input", "s3a://<INPUT BUCKET>/PATH", "--output", "s3a://<OUTPUT BUCKET>/PATH" ] parallelism: 2 upgradeMode: stateless

Hochverfügbarkeit (HA) für Flink-Bereitstellungen ermöglicht es Aufträgen, weiterhin Fortschritte zu machen, auch wenn ein vorübergehender Fehler auftritt und Ihre JobManager abstürzt. Die Aufträge werden neu gestartet, jedoch ab dem letzten erfolgreichen Checkpoint mit aktivierter HA. Ohne aktiviertes HA startet Kubernetes Ihr neu JobManager, aber Ihr Auftrag wird als neuer Auftrag gestartet und verliert seinen Fortschritt. Nach der Konfiguration von HA können wir Kubernetes anweisen, die HA-Metadaten in einem persistenten Speicher zu speichern, auf den im Falle eines vorübergehenden Ausfalls in der verwiesen werden soll, JobManager und dann unsere Aufträge ab dem letzten erfolgreichen Checkpoint fortzusetzen.

HA ist standardmäßig für Ihre Flink-Aufträge aktiviert (die Anzahl der Replikate ist auf 2 festgelegt, sodass Sie einen S3-Speicherort angeben müssen, damit HA-Metadaten bestehen bleiben).

HA-Konfigurationen

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: basic-example spec: flinkConfiguration: taskmanager.numberOfTaskSlots: "2" executionRoleArn: "<JOB EXECUTION ROLE ARN>" emrReleaseLabel: "emr-6.13.0-flink-latest" jobManager: resource: memory: "2048m" cpu: 1 replicas: 2 highAvailabilityEnabled: true storageDir: "s3://<S3 PERSISTENT STORAGE DIR>" taskManager: resource: memory: "2048m" cpu: 1

Im Folgenden finden Sie Beschreibungen für die oben genannten HA-Konfigurationen in JobManager (definiert unter .spec.JobManager):

  • highAvailabilityEnabled (optional, der Standardwert ist true): Setzen Sie diesen Wert auf false , wenn Sie HA nicht aktivieren und die bereitgestellten HA-Konfigurationen nicht verwenden möchten. Sie können das Feld „Replicas“ immer noch bearbeiten, um HA manuell zu konfigurieren.

  • replicas (optional, Standard ist 2): Wenn Sie diese Zahl auf größer als 1 setzen, werden andere Standby-Vorgänge erstellt JobManagers und Ihr Auftrag wird schneller wiederhergestellt. Wenn Sie HA deaktivieren, müssen Sie die Anzahl der Replikate auf 1 setzen, sonst erhalten Sie weiterhin Validierungsfehler (nur 1 Replikat wird unterstützt, wenn HA nicht aktiviert ist).

  • storageDir (erforderlich): Da wir die Anzahl der Replikate standardmäßig auf 2 setzen, müssen wir ein persistentes StorageDir bereitstellen. Derzeit akzeptiert dieses Feld nur S3-Pfade als Speicherort.

Pod-Lokalität

Wenn Sie HA aktivieren, versuchen wir auch, Pods in derselben AZ zusammenzufassen, was zu einer verbesserten Leistung führt (geringere Netzwerklatenz durch Pods in denselben AZs). Dabei handelt es sich um einen bestmöglichen Prozess. Wenn Sie also nicht über genügend Ressourcen in der AZ verfügen, in der die meisten Ihrer Pods geplant sind, werden die verbleibenden Pods zwar geplant, landen aber möglicherweise auf einem Knoten außerhalb dieser AZ.

Bestimmung des Leader-Replikats

Wenn HA aktiviert ist, ermitteln die Replikate anhand eines Leases, welches der JMs das Leader ist, und verwenden eine K8s-Configmap als Datenspeicher zum Speichern dieser Metadaten. Wenn Sie den Leader ermitteln möchten, können Sie sich den Inhalt der Configmap und den Schlüssel org.apache.flink.k8s.leader.restserver unter Daten ansehen, um den K8s-Pod mit der IP-Adresse zu finden. Sie können auch die folgenden Bash-Befehle verwenden.

ip=$(kubectl get configmap -n <NAMESPACE> <JOB-NAME>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"

Amazon EMR 6.13.0 und höher unterstützt Flink Native Kubernetes für die Ausführung von Flink-Anwendungen im Hochverfügbarkeitsmodus auf einem Amazon-EKS-Cluster.

Anmerkung

Sie müssen einen Amazon-S3-Bucket erstellt haben, um die Hochverfügbarkeitsmetadaten zu speichern, wenn Sie Ihren Flink-Auftrag einreichen. Wenn Sie dieses Feature nicht verwenden möchten, können Sie sie deaktivieren. Sie ist standardmäßig aktiviert.

Um die Flink-Hochverfügbarkeitsfunktion zu aktivieren, geben Sie die folgenden Flink-Parameter an, wenn Sie den run-application-CLI-Befehl ausführen. Die Parameter sind unter dem Beispiel definiert.

-Dhigh-availability.type=kubernetes \ -Dhigh-availability.storageDir=S3://DOC-EXAMPLE-STORAGE-BUCKET \ -Dfs.s3a.aws.credentials.provider="com.amazonaws.auth.WebIdentityTokenCredentialsProvider" \ -Dkubernetes.jobmanager.replicas=3 \ -Dkubernetes.cluster-id=example-cluster
  • Dhigh-availability.storageDir – Ein S3-Bucket, in dem Sie die Ergebnisse dieser Anforderung speichern möchten.

    Dkubernetes.jobmanager.replicas – Die Anzahl der Job-Manager-Pods, die als Ganzzahl größer als 1 erstellt werden sollen.

    Dkubernetes.cluster-id – Eine eindeutige ID, die den Flink-Cluster identifiziert.