Flink を設定する - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Flink を設定する

Amazon EMR リリース 6.9.0 以降では、Hive メタストアとAWS Glue Catalog の両方がサポートされ、Hive への Apache Flink コネクタが使用されています。このセクションでは、Flink でAWS Glue Catalog と Hive メタストアの設定に必要なステップについて説明します。

  1. リリース 6.9.0 以降を使用して、HIVEFLINK の 2 つ以上のアプリケーションを含む Amazon EMR クラスターを作成します。

  2. スクリプトランナーを使用して、次のスクリプトをステップ関数として実行します。Amazon EMR クラスターでコマンドとスクリプトを実行します

    hive-metastore-setupsh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
  1. HIVEFLINK の 2 つ以上のアプリケーションを含む EMR-6.9.0 クラスターを作成します。

  2. EMR-6.9 クラスターを作成する際に、AWS Glue データカタログ設定で「Hive テーブルメタデータに使用」を選択し、クラスターのデータカタログを有効にします。

  3. スクリプトランナーを使用して、次のスクリプトをステップ関数として実行します。Amazon EMR クラスターでコマンドとスクリプトを実行します

    glue-catalog-setupsh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar

Hive メタストアに Apache Flink コネクタを使用する際の制限と考慮事項の詳細については、Apache Flink ドキュメントの Apache Hive インテグレーションページを参照してください。

設定ファイルを使用して Flink を設定します。たとえば、Flink の主となる設定ファイルは flink-conf.yaml と呼ばれます。このファイルは、Amazon EMR 設定 API を使用して設定することができます。

AWS CLI を使用して Flink のタスクのスロット数を設定するには
  1. 次のコンテンツを含む configurations.json ファイルを作成します。

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. 次に、次の設定でクラスターを作成します。

    aws emr create-cluster --release-label emr-5.36.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
注記

Flink API を使用して設定を変更することもできます。詳細については、Flink のドキュメントで「Concepts (概念)」を確認してください。

Amazon EMR バージョン 5.21.0 以降では、実行中のクラスター内のインスタンスグループごとに、クラスター設定を上書きして追加の設定分類を指定できます。これを行うには、Amazon EMR コンソール、AWS Command Line Interface (AWS CLI)、または AWS SDK を使用します。詳細については、「実行中のクラスター内のインスタンスグループの設定を指定する」を参照してください。

アプリケーションの所有者は、どのリソースが Flink 内のタスクに割り当てられるべきかを最もよく知っています。このドキュメントでは、例として、アプリケーション用に使っているスレーブインスタンスと同じ数のタスクを使用します。一般的には、並列処理の最初の段階にこれをすすめていますが、タスクスロットを使用して並列処理の粒度を高めることも可能です。こうすると、通常、インスタンスごとの仮想コア数を超過することはありません。Flink のアーキテクチャの詳細については、Flink ドキュメントで「Concepts (概念)」をご確認ください。

現在、Amazon EMR 設定 API で設定可能なファイルは次のとおりです。

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

JobManager of Flink は、複数のプライマリノードを持つ Amazon EMR クラスターのマスターノードフェイルオーバープロセス中も引き続き使用できます。Amazon EMR バージョン 5.28.0 以降、 JobManager 高可用性も自動的に有効になります。手動設定は必要ありません。

Amazon EMR バージョン 5.27.0 以前では、 JobManager は単一障害点です。 JobManager が失敗すると、ジョブの状態はすべて失われ、実行中のジョブは再開されません。次の例に示すように、アプリケーションの試行回数、チェックポイントを設定し、Flink ZooKeeper のステートストレージとして有効にすることで、 JobManager 高可用性を実現できます。

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

YARN の最大アプリケーションマスター試行回数と Flink のアプリケーション試行回数の両方を設定する必要があります。詳細については、「YARN クラスターの高可用性の設定」を参照してください。また、Flink チェックポイントを設定して、 JobManager 再起動すると実行中のジョブが以前に完了したチェックポイントから復元されるようにすることもできます。詳細については、「Flink のチェックポイント」を参照してください。

Flink 1.11.x を使用する Amazon EMR バージョンでは、の () と JobManager (jobmanager.memory.process.size) TaskManager の両方の合計メモリプロセスサイズを設定する必要がありますflink-conf.yamltaskmanager.memory.process.sizeこれらの値を設定するには、設定 API を使用してクラスターを設定するか、SSH を介してこれらのフィールドのコメントを手動で解除します。Flink は以下のデフォルト値を提供します。

  • jobmanager.memory.process.size: 1600m

  • taskmanager.memory.process.size: 1728m

JVM メタスペースとオーバーヘッドを除外する場合は、の代わりに Flink の合計メモリサイズ (taskmanager.memory.flink.size) を使用しますtaskmanager.memory.process.size。のデフォルト値は 1280mtaskmanager.memory.process.size です。taskmanager.memory.process.sizetaskmanager.memory.process.size の両方を設定することはお勧めしません。

Flink 1.12.0 以降を使用するすべての Amazon EMR バージョンでは、Flink のオープンソースにリストされているデフォルト値が Amazon EMR のデフォルト値として設定されているため、自分で設定する必要はありません。

Flink アプリケーションコンテナは、.out ファイル、.log ファイル、.err ファイルの 3 種類のログファイルを作成して書き込みます。.err ファイルのみが圧縮されてファイルシステムから削除され、.log および .out ログファイルはファイルシステムに残ります。これらの出力ファイルを管理しやすく、log4j.propertiesクラスターを安定させるために、ログローテーションを設定して、ファイルの最大数を設定し、それらのサイズを制限できます。

Amazon EMR バージョン 5.30.0 以降

Amazon EMR 5.30.0 以降、Flinkflink-log4j. は設定分類名の log4j2 ロギングフレームワークを使用します。次の設定例は log4j2 形式を示しています。

[ { "Classification": "flink-log4j", "Properties": { "rootLogger.appenderRef.rolling.ref": "RollingFileAppender", "appender.rolling.name": "RollingFileAppender", "appender.rolling.type":"RollingFile", "appender.rolling.append" : "false", "appender.rolling.fileName" : "${sys:log.file}", "appender.rolling.filePattern" : "${sys:log.file}.%i", "appender.rolling.layout.type" : "PatternLayout", "appender.rolling.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.rolling.policies.type" : "Policies", "appender.rolling.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.rolling.policies.size.size" : "100MB", "appender.rolling.strategy.type" : "DefaultRolloverStrategy", "appender.rolling.strategy.max" : "10" }, } ]

Amazon EMR バージョン 5.29.0 以前

Amazon EMR バージョン 5.29.0 以前では、Flink は log4j ロギングフレームワークを使用しています。次の設定例は、log4j 形式を示しています。

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]