Konfiguration von Flink in Amazon EMR - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Konfiguration von Flink in Amazon EMR

EMRAmazon-Versionen 6.9.0 und höher unterstützen sowohl Hive Metastore als auch AWS Glue Catalog mit dem Apache Flink-Konnektor zu Hive. In diesem Abschnitt werden die Schritte beschrieben, die zur Konfiguration von AWS Glue Catalog und Hive Metastore mit Flink erforderlich sind.

  1. Erstellen Sie einen EMR Cluster mit Version 6.9.0 oder höher und mindestens zwei Anwendungen: Hive und Flink.

  2. Verwenden Sie Script Runner, um das folgende Skript als Schrittfunktion auszuführen:

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. Erstellen Sie einen EMR Cluster mit Version 6.9.0 oder höher und mindestens zwei Anwendungen: Hive und Flink.

  2. Wählen Sie in den Einstellungen des AWS Glue Data Catalogs die Option Für Hive-Tabellenmetadaten verwenden aus, um den Data Catalog im Cluster zu aktivieren.

  3. Verwenden Sie Script Runner, um das folgende Skript als Schrittfunktion auszuführen: Befehle und Skripts auf einem EMR Amazon-Cluster ausführen:

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

Sie können die EMR Amazon-Konfiguration verwendenAPI, um Flink mit einer Konfigurationsdatei zu konfigurieren. Die Dateien, die innerhalb von konfiguriert werden können, API sind:

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Die Hauptkonfigurationsdatei für Flink heißt flink-conf.yaml.

So konfigurieren Sie die Anzahl der Aufgaben-Slots für Flink mithilfe der AWS CLI
  1. Erstellen Sie eine Datei, configurations.json, mit folgendem Inhalt:

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. Im nächsten Schritt erstellen Sie einen Cluster mit der folgenden Konfiguration:

    aws emr create-cluster --release-label emr-7.3.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
Anmerkung

Sie können einige Konfigurationen auch mit dem Flink API ändern. Weitere Informationen finden Sie unter Konzepte in der Flink-Dokumentation.

Mit EMR Amazon-Version 5.21.0 und höher können Sie Cluster-Konfigurationen überschreiben und zusätzliche Konfigurationsklassifizierungen für jede Instance-Gruppe in einem laufenden Cluster angeben. Dazu verwenden Sie die EMR Amazon-Konsole, die AWS Command Line Interface (AWS CLI) oder die AWS SDK. Weitere Informationen finden Sie unter Angeben einer Konfiguration für eine Instance-Gruppe in einem aktiven Cluster.

Als Eigentümer Ihrer Anwendung wissen Sie am besten, welche Ressourcen Aufgaben innerhalb von Flink zugewiesen werden müssen. Für Beispiele in dieser Dokumentation verwenden Sie die gleiche Anzahl von Aufgaben wie die Aufgaben-Instances, die Sie für die Anwendung nutzen. Wir empfehlen diese Vorgehensweise generell für die anfängliche Parallelität. Sie können jedoch die Granularität der Parallelität mithilfe von Aufgaben-Slots erhöhen. Dabei sollte die Anzahl von virtuellen Cores pro Instance im Allgemeinen nicht überschritten werden. Weitere Informationen über die Architektur von Flink finden Sie unter Konzepte in der Flink-Dokumentation.

Der JobManager von Flink bleibt während des Failover-Prozesses für den primären Knoten in einem EMR Amazon-Cluster mit mehreren Primärknoten verfügbar. Ab Amazon EMR 5.28.0 ist JobManager Hochverfügbarkeit auch automatisch aktiviert. Es ist keine manuelle Konfiguration erforderlich.

Bei EMR Amazon-Versionen 5.27.0 oder früher JobManager handelt es sich um einen einzigen Fehlerpunkt. Wenn der JobManager fehlschlägt, gehen alle Jobstatus verloren und die laufenden Jobs werden nicht wieder aufgenommen. Sie können JobManager Hochverfügbarkeit aktivieren, indem Sie die Anzahl der Anwendungsversuche, das Checkpointing und die Aktivierung ZooKeeper als Statusspeicher für Flink konfigurieren, wie das folgende Beispiel zeigt:

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

Sie müssen sowohl die maximale Anzahl der Anwendungsmasterversuche für Flink als auch die maximalen Anwendungsversuche für YARN Flink konfigurieren. Weitere Informationen finden Sie unter Konfiguration der Hochverfügbarkeit von YARN Clustern. Möglicherweise möchten Sie auch das Flink-Checkpointing so konfigurieren, dass neu gestartete Jobs von zuvor abgeschlossenen Checkpoints JobManager wiederhergestellt werden. Weitere Informationen finden Sie unter Flink-Checkpointing.

Für EMR Amazon-Versionen, die Flink 1.11.x verwenden, müssen Sie die Gesamtspeicherprozessgröße sowohl für () als auch für JobManager (jobmanager.memory.process.size) in konfigurieren. TaskManager taskmanager.memory.process.size flink-conf.yaml Sie können diese Werte festlegen, indem Sie entweder den Cluster mit der Konfiguration konfigurieren API oder diese Felder manuell über auskommentieren. SSH Flink bietet die folgenden Standardwerte.

  • jobmanager.memory.process.size: 1 600 m

  • taskmanager.memory.process.size: 1 728 m

Um JVM Metaspace und Overhead auszuschließen, verwenden Sie stattdessen die gesamte Flink-Speichergröße (). taskmanager.memory.flink.size taskmanager.memory.process.size Der Standardwert von taskmanager.memory.process.size beträgt 1 280 m. Es wird nicht empfohlen, sowohl taskmanager.memory.process.size als auch taskmanager.memory.process.size zu setzen.

Alle EMR Amazon-Versionen, die Flink 1.12.0 und höher verwenden, haben die im Open-Source-Set für Flink aufgeführten Standardwerte als Standardwerte bei AmazonEMR, sodass Sie sie nicht selbst konfigurieren müssen.

Flink-Anwendungscontainer erstellen drei Arten von Protokolldateien und schreiben in sie: .out-Dateien, .log-Dateien und .err-Dateien. Nur .err-Dateien werden komprimiert und aus dem Dateisystem entfernt, während .log- und .out-Protokolldateien im Dateisystem verbleiben. Um sicherzustellen, dass diese Ausgabedateien verwaltbar bleiben und der Cluster stabil bleibt, können Sie die Protokollrotation in log4j.properties so konfigurieren, dass eine maximale Anzahl von Dateien festgelegt und deren Größe begrenzt wird.

EMRAmazon-Versionen 5.30.0 und höher

Ab Amazon EMR 5.30.0 verwendet Flink das Logging-Framework log4j2 mit dem Namen der Konfigurationsklassifikation. Die folgende Beispielkonfiguration demonstriert flink-log4j. das log4j2-Format.

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

EMRAmazon-Versionen 5.29.0 und früher

In den EMR Amazon-Versionen 5.29.0 und früher verwendet Flink das Logging-Framework log4j. Die folgende Beispielkonfiguration veranschaulicht das log4j Format.

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

EMRAmazon-Versionen 6.12.0 und höher bieten Java 11-Laufzeitunterstützung für Flink. In den folgenden Abschnitten wird beschrieben, wie der Cluster so konfiguriert wird, dass er Java-11-Laufzeitunterstützung für Flink bereitstellt.

Gehen Sie wie folgt vor, um einen EMR Cluster mit Flink und Java 11-Runtime zu erstellen. Die Konfigurationsdatei, in der Sie die Java-11-Laufzeitunterstützung hinzufügen, befindet sich. flink-conf.yaml

Console
Um einen Cluster mit Flink und Java 11-Laufzeit in der Konsole zu erstellen
  1. Melden Sie sich bei der AWS Management Console an und öffnen Sie die EMR Amazon-Konsole unter https://console.aws.amazon.com/emr.

  2. Wählen Sie EC2 im Navigationsbereich EMRunterCluster“ und anschließend „Cluster erstellen“ aus.

  3. Wählen Sie EMR Amazon-Version 6.12.0 oder höher und installieren Sie die Flink-Anwendung. Wählen Sie alle anderen Anwendungen aus, die Sie auf Ihrem Cluster installieren möchten.

  4. Fahren Sie mit der Einrichtung Ihres Clusters fort. Verwenden Sie im Bereich optionale Softwareeinstellungen die Standardoption Konfiguration eingeben und geben Sie die folgende Konfiguration ein:

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. Fahren Sie mit der Einrichtung und dem Start Ihres Clusters fort.

AWS CLI
Um einen Cluster mit Flink und Java 11-Runtime aus dem zu erstellen CLI
  1. Erstellen Sie eine Konfigurationsdatei configurations.json, die Flink für die Verwendung von Java 11 konfiguriert.

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. Erstellen Sie aus dem AWS CLI einen neuen EMR Cluster mit EMR Amazon-Version 6.12.0 oder höher und installieren Sie die Flink-Anwendung, wie im folgenden Beispiel gezeigt:

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

Gehen Sie wie folgt vor, um einen laufenden EMR Cluster mit Flink und Java 11-Runtime zu aktualisieren. Die Konfigurationsdatei, in der Sie die Java-11-Laufzeitunterstützung hinzufügen, befindet sich. flink-conf.yaml

Console
Um einen laufenden Cluster mit Flink und Java 11-Runtime in der Konsole zu aktualisieren
  1. Melden Sie sich bei der AWS Management Console an und öffnen Sie die EMR Amazon-Konsole unter https://console.aws.amazon.com/emr.

  2. Wählen Sie EC2 im Navigationsbereich unter EMROn die Option Cluster aus und wählen Sie dann den Cluster aus, den Sie aktualisieren möchten.

    Anmerkung

    Der Cluster muss Amazon EMR Release 6.12.0 oder höher verwenden, um Java 11 zu unterstützen.

  3. Wählen Sie die Registerkarte Konfigurationen aus.

  4. Wählen Sie im Abschnitt Instance-Gruppenkonfigurationen die Instance-Gruppe Running aus, die Sie aktualisieren möchten, und wählen Sie dann Neukonfiguration aus dem Menü mit den Listenaktionen aus.

  5. Konfigurieren Sie die Instance-Gruppe mit der Option Attribute bearbeiten wie folgt neu. Wählen Sie nach jeder Konfiguration Neue Konfiguration hinzufügen aus.

    Klassifizierung Eigenschaft Wert

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. Wählen Sie Änderungen speichern aus um die Konfigurationen hinzuzufügen. .

AWS CLI
Um einen laufenden Cluster so zu aktualisieren, dass er Flink und Java 11-Runtime von der CLI

Verwenden Sie den Befehl modify-instance-groups, um eine neue Konfiguration für eine Instance-Gruppe in einem laufenden Cluster anzugeben.

  1. Erstellen Sie eine Konfigurationsdatei configurations.json, die Flink für die Verwendung von Java 11 konfiguriert. Ersetzen Sie im folgenden Beispiel ig-1xxxxxxx9 durch die ID für die Instanzgruppe, die Sie neu konfigurieren möchten. Speichern Sie die Datei im selben Verzeichnis, in dem Sie den modify-instance-groups-Befehl ausführen werden.

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. Führen Sie von der AWS CLI aus den folgenden Befehl aus. Ersetzen Sie die ID für die Instance-Gruppe, die Sie neu konfigurieren möchten:

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

Um die Java-Laufzeit für einen laufenden Cluster zu ermitteln, melden Sie sich beim primären Knoten an, SSH wie unter Connect zum Primärknoten herstellen mit beschriebenSSH. Führen Sie anschließend den folgenden Befehl aus:

ps -ef | grep flink

Der ps-Befehl mit der -ef-Option listet alle laufenden Prozesse auf dem System auf. Sie können diese Ausgabe mit grep filtern, um Erwähnungen der Zeichenfolge flink zu finden. Überprüfen Sie die Ausgabe für den Wert Java Runtime Environment (JRE),jre-XX. In der folgenden Ausgabe wird jre-11 angegeben, dass Java 11 zur Laufzeit von Flink abgerufen wird.

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

Alternativ können Sie sich mit dem Befehl flink-yarn-session -d beim primären Knoten anmelden SSH und eine YARN Flink-Sitzung starten. Die Ausgabe zeigt die Java Virtual Machine (JVM) für Flink, java-11-amazon-corretto im folgenden Beispiel:

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64