在 Amazon EMR 中設定 Flink - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Amazon EMR 中設定 Flink

Amazon EMR 發布 6.9.0 及更高版本支持蜂巢中繼存儲和 AWS Glue 目錄與 Apache Flink 連接器到蜂巢。本章節概述了使用 Flink 設定 AWS Glue CatalogHive 中繼存放區所需的步驟。

  1. 建立具有 6.9.0 版或更高版本的 EMR 叢集,以及至少兩個應用程式:Hive 和 Flink

  2. 使用指令碼執行器將下列指令碼作為 Step Functions 執行:

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
  1. 建立具有 6.9.0 版或更高版本的 EMR 叢集,以及至少兩個應用程式:Hive 和 Flink

  2. 在 AWS Glue Data Catalog 設定中選取用於 Hive 資料表中繼資料,以在叢集中啟用 Data Catalog。

  3. 使用指令碼執行器將下列指令碼作為 Step Functions 執行:在 Amazon EMR 叢集上執行命令和指令碼

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar

您可以使用 Amazon EMR 組態 API 透過組態檔案設定 Flink。可在 API 內設定的檔案包括:

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Flink 的主要組態檔案名為 flink-conf.yaml

從 AWS CLI設定用於 Flink 的任務位置數量
  1. 使用下列內容建立檔案 configurations.json

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. 再以下列組態建立叢集:

    aws emr create-cluster --release-label emr-7.1.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
注意

您還可以使用 Flink API 變更部分組態。如需詳細資訊,請參閱 Flink 文件中的概念

對於 Amazon EMR 版本 5.21.0 及更高版本,您可以覆寫叢集組態,並且為執行中叢集的每個執行個體群組,指定額外組態分類。您可以使用 Amazon EMR 主控台、 AWS Command Line Interface (AWS CLI) 或 AWS 開發套件來執行此操作。如需詳細資訊,請參閱為執行中叢集的執行個體群組提供組態

作為自己應用程式的擁有者,您最清楚應在 Flink 內將哪些資源指派給任務。如需本文中的範例,請使用與您用於應用程式的任務執行個體相同數量的任務。一般而言,我們會建議在執行初始等級的平行處理時採用此設定,但您也可以用任務位置來增加平行處理的精細度;通常不應超過每個執行個體的虛擬核心數量。如需有關 Flink 架構的詳細資訊,請參閱 Flink 文件中的概念

在具有多個主節點 JobManager 的 Amazon EMR 叢集中的主節點容錯移轉程序期間,Flink 仍然可以使用。從 Amazon EMR 5.28.0 開始,也會自動啟用 JobManager 高可用性。不需要手動設定。

使用 Amazon EMR 5.27.0 或更早版本, JobManager 就是單一故障點。失 JobManager 敗時,它會遺失所有工作狀態,而且不會繼續執行中的工作。您可以透過設定應用程式嘗試計數、檢查點和啟用 ZooKeeper 為 Flink 的狀態儲存來啟用 JobManager 高可用性,如下列範例所示:

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

您必須為 YARN 設定最大應用程式主控嘗試次數,以及為 Flink 設定應用程式嘗試次數。如需詳細資訊,請參閱 YARN 叢集高可用性的組態。您也可以設定 Flink 檢查點,讓重新啟動的工作從先前完成的檢查點 JobManager 復原執行中的工作。如需詳細資訊,請參閱 Flink 設定檢查點

對於使用 Flink 1.11.x 的 Amazon EMR 版本,您必須同時設定 () 和 JobManager (jobmanager.memory.process.size) 中的記憶體處理序總大小。 TaskManager taskmanager.memory.process.size flink-conf.yaml您可以透過使用組態 API 設定叢集或透過 SSH 手動取消註解這些欄位來設定這些值。Flink 提供了下列預設值。

  • jobmanager.memory.process.size:1600m

  • taskmanager.memory.process.size:1728m

若要排除 JVM 中繼空間和額外負荷,請使用 Flink 記憶體總大小 (taskmanager.memory.flink.size) 而非 taskmanager.memory.process.sizetaskmanager.memory.process.size 的預設值為 1280m。不建議同時設定 taskmanager.memory.process.sizetaskmanager.memory.process.size

所有使用 Flink 1.12.0 及更新版本的 Amazon EMR 版本都將 Flink 開放原始碼集中列出的預設值作為 Amazon EMR 上的預設值,因此您無需自行設定它們。

Flink 應用程式容器會建立並寫入三種類型的日誌檔案:.out 檔案、.log 檔案和 .err 檔案。僅 .err 檔案被壓縮並從檔案系統中移除,而 .log.out 日誌檔案仍保留在檔案系統中。為了確保這些輸出檔案保持可管理且叢集保持穩定,您可以在 log4j.properties 中設定日誌輪換以設定檔案數量上限並限制大小。

Amazon EMR 5.30.0 版及更新版本

從 Amazon EMR 5.30.0 開始,Flink 使用組態分類名稱為 flink-log4j. 的 log4j2 日誌記錄架構。下列範例組態示範了 log4j2 格式。

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

Amazon EMR 5.29.0 版及更早版本

在 Amazon EMR 5.29.0 版及更早版本中,Flink 使用 log4j 日誌記錄架構。下列範例組態示範了 log4j 格式。

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

Amazon EMR 6.12.0 版及更高版本為 Flink 提供了 Java 11 執行期支援。下列各章節描述如何設定叢集,以為 Flink 提供 Java 11 執行期支援。

使用下列步驟透過 Flink 和 Java 11 執行期建立 EMR 叢集。您在其中新增 Java 11 執行期支援的組態檔案是 flink-conf.yaml

New console
在新主控台中使用 Flink 和 Java 11 執行期建立叢集
  1. 登入並開啟 Amazon EMR 主控台 AWS Management Console,網址為 https://console.aws.amazon.com/emr

  2. 在導覽窗格中的 EC2 上的 EMR 下,選擇叢集,然後選擇建立叢集

  3. 選取 Amazon EMR 6.12.0 版或更高版本,然後選擇安裝 Flink 應用程式。選取您要在您的叢集上安裝的任何其他應用程式。

  4. 繼續設定您的叢集。在選用軟體設定區段中,使用預設輸入組態選項,然後輸入下列組態:

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. 繼續設定並啟動您的叢集。

AWS CLI
從 CLI 使用 Flink 和 Java 11 執行期建立叢集
  1. 建立將 Flink 設定為使用 Java 11 的組態檔案 configurations.json

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. 從中 AWS CLI建立具有 Amazon EMR 6.12.0 或更新版本的新 EMR 叢集,然後安裝 Flink 應用程式,如下列範例所示:

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

使用下列步驟透過 Flink 和 Java 11 執行期更新執行中的 EMR 叢集。您在其中新增 Java 11 執行期支援的組態檔案是 flink-conf.yaml

New console
在新主控台中使用 Flink 和 Java 11 執行期更新執行中的叢集
  1. 登入並開啟 Amazon EMR 主控台 AWS Management Console,網址為 https://console.aws.amazon.com/emr

  2. 在導覽窗格中的 EC2 上的 EMR 下,選擇叢集,然後選取您要更新的叢集。

    注意

    叢集必須使用 Amazon EMR 6.12.0 版或更高版本才能支援 Java 11。

  3. 選取組態標籤。

  4. 執行個體群組組態區段中,選取您要更新的執行中執行個體群組,然後從清單動作功能表中選擇重新設定

  5. 使用編輯屬性選項重新設定執行個體群組,如下所示。在每一項之後選取新增組態

    分類 屬性 Value

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. 選取儲存變更以新增組態。

AWS CLI
從 CLI 使用 Flink 和 Java 11 執行期更新執行中的叢集

使用 modify-instance-groups 命令,為執行中叢集中的執行個體群組指定新組態。

  1. 首先,建立將 Flink 設定為使用 Java 11 的組態檔案 configurations.json。在下列範例中,將 ig-1xxxxxxx9 取代為您要重新設定的執行個體群組的 ID。將檔案儲存在執行 modify-instance-groups 命令的相同目錄中。

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. 從 AWS CLI,執行下列命令。取代您要重新設定的執行個體群組的 ID:

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

若要確定執行中的叢集的 Java 執行期,請使用 SSH 登入主節點,如使用 SSH 連接至主節點中所述。然後執行以下命令:

ps -ef | grep flink

具有 -ef 選項的 ps 命令列出了系統上所有執行中的程序。您可以使用 grep 篩選該輸出,以尋找提及的字串 flink。檢閱 Java 執行階段環境 (JRE) 值 jre-XX 的輸出。在下列輸出中,jre-11 指出在執行期為 Flink 選擇 Java 11。

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

或者,使用 SSH 登入主節點,然後使用命令 flink-yarn-session -d 啟動 Flink YARN 工作階段。輸出顯示 Flink 的 Java 虛擬機器 (JVM),在下列範例中為 java-11-amazon-corretto

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64