Spark の設定 - Amazon EMR

Spark の設定

設定分類を使用して、Amazon EMR 上の Spark を設定できます。設定分類の詳細については「アプリケーションの設定」を参照してください。

Amazon EMR の Spark の設定分類には次が含まれます。

  • spark - maximizeResourceAllocation プロパティを true または false に設定します。true の場合、Amazon EMR はクラスターハードウェア設定に基づいて、自動的に spark-defaults プロパティを設定します。詳細については、「maximizeResourceAllocation を使用する」を参照してください。

  • spark-defaults - spark-defaults.conf ファイルに値を設定します。詳細については、Spark ドキュメントの「Spark Configuration」を参照してください。

  • spark-env - spark-env.sh ファイルに値を設定します。詳細については、Spark ドキュメントの「Environment Variables」を参照してください。

  • spark-hive-site - Spark の hive-site.xml に値を設定します。

  • spark-log4j — (Amazon EMR リリース 6.7.x 以前) log4j.properties ファイルに値を設定します。詳細については Github の log4j.properties.template ファイルをご覧ください。

  • spark-log4j2 — (Amazon EMR リリース 6.8.0 以降) log4j2.properties ファイルに値を設定します。詳細については、Github の log4j2.properties.template ファイルをご覧ください。

  • spark-metrics - metrics.properties ファイルに値を設定します。設定と詳細については Github の metrics.properties.template ファイルと Spark ドキュメントの Metrics をご覧ください。

注記

Spark ワークロードを別のプラットフォームから Amazon EMR に移行する場合、カスタム設定を追加する前に、Amazon EMR で設定される Spark のデフォルト でワークロードをテストすることをお勧めします。ほとんどのお客様は、デフォルト設定でパフォーマンスが向上されます。

Amazon EMR で設定される Spark のデフォルト

次の表はアプリケーションに影響する spark-defaults で Amazon EMR がどのようにデフォルト値を設定するか示しています。

Amazon EMR で設定される Spark のデフォルト
設定 説明 デフォルト値
spark.executor.memory

エグゼキュタープロセスごとに使用するメモリ量。例: 1g2g

この設定は、クラスター内のコアインスタンスタイプとタスクインスタンスタイプによって決まります。

spark.executor.cores

各エグゼキュターに使用するコアの数。

この設定は、クラスター内のコアインスタンスタイプとタスクインスタンスタイプによって決まります。

spark.dynamicAllocation.enabled

true の場合、動的なリソース割り当てを使用して、ワークロードによって、アプリケーションで登録するエグゼキュターの数を調整します。

true (Amazon EMR 4.4.0 以降を使用)

注記

Spark Shuffle Service は Amazon EMR によって自動的に設定されます。

spark.sql.hive.advancedPartitionPredicatePushdown.enabled

true の場合、Hive メタストアへの高度なパーティション述語プッシュダウンが有効になります。

true
spark.sql.hive.stringLikePartitionPredicatePushdown.enabled

startsWithcontains、および endsWith フィルターを Hive メタストアにプッシュダウンします。

注記

Glue は startsWithcontains、または endsWith の述語プッシュダウンをサポートしていません。Glue メタストアを使用していて、これらの関数の述語プッシュダウンが原因でエラーが発生した場合は、この設定を false にしてください。

true

Amazon EMR 6.1.0 での Spark ガベージコレクションの設定

spark.driver.extraJavaOptionsspark.executor.extraJavaOptions を使用してカスタムガーベッジコレクション構成を設定すると、Amazon EMR 6.1.0 とガーベッジコレクション設定で競合が生じるため、Amazon EMR 6.1 でのドライバーまたはエグゼキューター起動が失敗します。Amazon EMR 6.1.0 の場合、デフォルトのガベージコレクション設定は spark.driver.defaultJavaOptions および spark.executor.defaultJavaOptions を介して設定されます。この設定は、Amazon EMR 6.1.0 にのみ適用されます。ガーベッジコレクションに関連しない JVM オプション (ロギングの設定用のオプションなど (-verbose:class)) は、extraJavaOptions で設定できます。詳細については、「Spark application properties」を参照してください。

maximizeResourceAllocation を使用する

クラスター内の各ノードでリソースを最大限に使用できるようにエグゼキューターを設定するには、spark 設定分類で maximizeResourceAllocationtrue に設定します。maximizeResourceAllocation は Amazon EMR に固有のものです。maximizeResourceAllocation を有効にすると、EMR はコアインスタンスグループのインスタンスのエグゼキュターで利用可能な最大のコンピューティングとメモリリソースを計算します。次に、計算された最大値に基づいて対応する spark-defaults 設定を設定します。

注記

HBase などの他の分散アプリケーションでは、クラスターで maximizeResourceAllocation オプションを使用しないでください。Amazon EMR は分散アプリケーションにカスタム YARN 設定を使用するため、maximizeResourceAllocation と競合し、Spark アプリケーションが失敗する可能性があります。

以下に、maximizeResourceAllocationtrue に設定された Spark 設定分類の例を示します。

[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
設定は spark-defaults で設定されます (maximizeResourceAllocation が有効になっている場合)
設定 説明
spark.default.parallelism ユーザーによって設定されていない場合に join、reduceByKey、parallelize などの変換によって返される、RDD のデフォルトパーティション数。

YARN コンテナで使用可能な CPU コアの数 (2 の倍数)。

spark.driver.memory ドライバープロセス (SparkContext が初期化されている場合) に使用するメモリの量 (例: 1g、2g)

設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。ただし、Spark ドライバーアプリケーションは、プライマリインスタンスまたはいずれかのコアインスタンスで (例えば、YARN クライアントモードとクラスターモードのそれぞれで) 実行されるため、この設定は、これらの 2 つのインスタンスグループ間で、小さい方のインスタンスタイプに基づいて定義されます。

spark.executor.memory エグゼキュターのプロセスごとに使用するメモリの量 (例: 1g、2g)

設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。

spark.executor.cores 各エグゼキュターに使用するコアの数。 設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。
spark.executor.instances エグゼキュターの数。

設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。同時に spark.dynamicAllocation.enabled が明示的に true に設定されていない限り設定されます。

ノード停止の動作設定

Amazon EMR リリース 5.9.0 以降を使用する場合、Amazon EMR の Spark には、手動によるサイズ変更や自動のスケーリングポリシーの要求によるノードの停止を Spark が適切に処理できるようにするための、機能のセットが含まれています。Amazon EMR では Spark に拒否リストメカニズムを実装しています。これは YARN の停止メカニズムに基づいて構築されています。このメカニズムにより、停止中のノードで新しいタスクがスケジュールされないようにし、同時に、既に実行中のタスクを完了するようにします。さらに、ノードの終了時にシャッフルブロックが失われた場合に、Spark のジョブを迅速に回復するための機能があります。再計算プロセスはすぐに実行され、より少ないステージの再試行でより迅速に再計算を行うように最適化されます。シャッフルブロックが見つからないことにより発生するフェッチのエラーによるジョブの失敗を避けることができます。

重要

Amazon EMR リリース 5.11.0 に spark.decommissioning.timeout.threshold 設定が追加され、スポットインスタンスを使用する場合の Spark の耐障害性が向上しました。これまでのリリースでは、ノードがスポットインスタンスを使用していて、インスタンスが入札価格のために終了した場合、Spark は終了を適切に処理できない場合があります。ジョブが失敗する場合があり、シャッフルの再計算に長い時間がかかる場合があります。このため、スポットインスタンスを使用する場合は、リリース 5.11.0 以降を使用することをお勧めします。

Spark のノード停止の設定
設定 説明 デフォルト値

spark.blacklist.decommissioning.enabled

true に設定すると、Spark は、YARN で decommissioning 状態にあるノードの拒否リストを作成します。Spark はそのノードで実行されるエグゼキュターに新しいタスクをスケジュールしません。既に実行中のタスクは完了することができます。

true

spark.blacklist.decommissioning.timeout

decommissioning 状態にあるノードが拒否リストに載せられる時間。デフォルトではこの値は 1 時間に設定されています。これは yarn.resourcemanager.decommissioning.timeout のデフォルトと同じです。ノードが停止期間全体にわたって拒否リストに載せられるようにするには、この値を yarn.resourcemanager.decommissioning.timeout 以上に設定します。停止のタイムアウトの期限が切れた後、ノードは decommissioned 状態に移行し、Amazon EMR はノードの EC2 インスタンスを終了することができます。タイムアウトの期限が切れた後も実行しているタスクがある場合、それらのタスクは失われるか、または強制終了され、他のノードで実行されているエグゼキュターで再スケジュールされます。

1h

spark.decommissioning.timeout.threshold

Amazon EMR リリース 5.11.0 以降で使用できます。指定は秒単位です。ノードが停止状態に移行する際、ホストがこの値以下の使用停止時間内に停止される場合、Amazon EMR はノードを拒否リストに追加するだけではなく、ノードが停止状態に移行するのを待たずに、ホストの状態を (spark.resourceManager.cleanupExpiredHost の指定に従って) クリーンアップします。これにより、スポットインスタンスが yarn.resourcemager.decommissioning.timeout の値にかかわらず 20 秒以内にタイムアウトするため、Spark がスポットインスタンスの終了をより良く処理できます。ただし、他のノードがシャッフルファイルを読み取るための時間が足りない場合があります。

20s

spark.resourceManager.cleanupExpiredHost

true に設定すると、Spark は、decommissioned 状態にあるノードのエグゼキュターに保存されているすべてのキャッシュデータとシャッフルブロックの登録を解除します。これにより、復旧プロセスが高速化されます。

true

spark.stage.attempt.ignoreOnDecommissionFetchFailure

true に設定すると、Spark でのステージの失敗と、停止ノードでの多数のフェッチの失敗によるジョブの失敗を回避できます。decommissioned 状態にあるノードのシャッフルブロックのフェッチの失敗は、連続するフェッチのエラーの最大数にカウントされません。

true

Spark の ThriftServer 環境変数

Spark では、Hive Thrift Server Port 環境変数である HIVE_SERVER2_THRIFT_PORT が 10001 に設定されます。

Spark のデフォルト設定の変更

spark-defaults 設定分類を使用して、spark-defaults.conf 内のデフォルトを変更するか、spark 設定分類の maximizeResourceAllocation 設定を使用します。

次の手順では、CLI またはコンソールを使用して設定を変更する方法を示します。

CLI を使用して、spark.executor.memory が 2g に設定されたクラスターを作成する
  • Amazon S3 に格納されている myConfig.json ファイルを参照する次のコマンドを使用して、Spark がインストールされ、spark.executor.memory が 2g に設定されたクラスターを作成します。

    aws emr create-cluster --release-label emr-5.36.1 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    注記

    読みやすくするために、Linux 行連続文字 (\) が含まれています。Linux コマンドでは、これらは削除することも、使用することもできます。Windows の場合、削除するか、キャレット (^) に置き換えてください。

    myConfig.json:

    [ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]
コンソールを使用して、spark.executor.memory が 2g に設定されたクラスターを作成する
  1. 新しい Amazon EMR コンソールに移動し、サイドナビゲーションから [古いコンソールに切り替え] を選択します。古いコンソールに切り替えたときの動作の詳細については、「Using the old console」を参照してください。

  2. [Create cluster (クラスターの作成)]、[Go to advanced options (詳細オプションに移動する)] の順に選択します。

  3. [Spark] を選択します。

  4. [Edit software settings] (ソフトウェア設定の編集) で、[Enter configuration] (設定の入力) を選択したままにしておき、次の設定を入力します。

    classification=spark-defaults,properties=[spark.executor.memory=2G]
  5. 他のオプションを選択し、 を選択して、[Create cluster] (クラスターの作成) を選択します。

maximizeResourceAllocation を設定するには
  • Amazon S3 に格納されている myConfig.json ファイルを参照して、Spark がインストールされ、maximizeResourceAllocation が true に設定されたクラスターを AWS CLI を使用して作成します。

    aws emr create-cluster --release-label emr-5.36.1 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    注記

    読みやすくするために、Linux 行連続文字 (\) が含まれています。Linux コマンドでは、これらは削除することも、使用することもできます。Windows の場合、削除するか、キャレット (^) に置き換えてください。

    myConfig.json:

    [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
注記

Amazon EMR バージョン 5.21.0 以降では、実行中のクラスター内のインスタンスグループごとに、クラスター設定を上書きして追加の設定分類を指定できます。これを行うには、Amazon EMR コンソール、AWS Command Line Interface (AWS CLI)、または AWS SDK を使用します。詳細については、「実行中のクラスター内のインスタンスグループの設定を指定する」を参照してください。

Apache Log4j 1.x から Log4j 2.x への移行

Apache Spark リリース 3.2.x 以前のリリースでは、レガシー Apache Log4j 1.x とその log4j.properties ファイルを使用して Spark プロセスで Log4j を設定します。Apache Spark リリース 3.3.0 以降のリリースでは、Apache Log4j 2.x とその log4j2.properties ファイルを使用して Spark プロセスで Log4j を設定します。

6.8.0 より前の Amazon EMR リリースを使用して Apache Spark Log4j を設定した場合、Amazon EMR 6.8.0 以降にアップグレードする前に、従来の spark-log4j 設定分類を削除し、spark-log4j2 設定分類とキー形式に移行する必要があります。Amazon EMR リリース 6.8.0 以降では、従来の spark-log4j 分類によりクラスターの作成が ValidationException エラーで失敗します。Log4j の非互換性に関連する障害については課金されませんが、続行するには無効になっている spark-log4j 設定分類を削除する必要があります。

Apache Log4j 1.x から Log4j 2.x への移行に関する詳細は、Github の「Apache Log4j Migration Guide」と「Spark Log4j 2 Template」を参照してください。

注記

Amazon EMR では、Apache Spark は「Apache Log4j Migration Guide」で説明されている.xml ファイルではなく log4j2.properties ファイルを使用します。また、Log4j 1.x ブリッジメソッドを使用して Log4j 2.x に変換することはお勧めしません。