Amazon EMR
Amazon EMR リリース ガイド

Spark の設定

クラスターの作成時に設定分類を使用して Spark on Amazon EMR を設定できます。設定分類の使用方法の詳細については「アプリケーションの設定」を参照してください。

Amazon EMR の Spark の設定分類には次が含まれます。

  • sparkmaximizeResourceAllocation プロパティを true または false に設定します。true の場合、Amazon EMR はクラスターハードウェア設定に基づいて、自動的に spark-default プロパティを設定します。詳細については、「maximizeResourceAllocation を使用する」を参照してください。

  • spark-defaultsspark-defaults.conf ファイルで値を設定します。詳細については、Spark ドキュメントの「Spark Configuration」を参照してください。

  • spark-envspark-env.sh ファイルで値を設定します。詳細については、Spark ドキュメントの「Environment Variables」を参照してください。

  • spark-hive-site— Spark の hive-site.xml で値を設定します。

  • spark-log4jlog4j.properties ファイルで値を設定します。設定と詳細については Github の log4j.properties.template ファイルをご覧ください。

  • spark-metricsmetrics.properties ファイルで値を設定します。設定と詳細については Github の metrics.properties.template ファイルと Spark ドキュメントの Metrics をご覧ください。

Amazon EMR で設定される Spark のデフォルト

次の表はアプリケーションに影響する spark-default で Amazon EMR がどのようにデフォルト値を設定するか示しています。

Amazon EMR で設定される Spark のデフォルト

設定 説明
spark.executor.memory エグゼキュータのプロセスごとに使用するメモリの量 (例: 1g、2g)

設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。

spark.executor.cores 各エグゼキュータに使用するコアの数。 設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。
spark.dynamicAllocation.enabled 動的なリソース割り当てを使用するかどうか。動的なリソース割り当てでは、ワークロードによって、アプリケーションで登録するエグゼキュータの数を調整します。

true (emr-4.4.0 以上)

注記

Spark Shuffle Service は Amazon EMR によって自動的に設定されます。

maximizeResourceAllocation を使用する

クラスターの作成時に、spark 設定分類を使用して maximizeResourceAllocation オプションを true にすることで、クラスター内の各ノードでリソース最大限に使用できるようにエグゼキュターを設定することができます。この EMR 固有のオプションは、コアインスタンスグループのインスタンスのエグゼキューターで利用可能な最大のコンピューティングとメモリリソースを計算します。次に、この情報に基づいて対応する spark-defaults が設定されます。

[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]

設定は spark-defaults で設定されます (maximizeResourceAllocation が有効になっている場合)

設定 説明
spark.default.parallelism ユーザーによって設定されていない場合に join、reduceByKey、parallelize などの変換によって返される、RDD のデフォルトパーティション数。

YARN コンテナで使用可能な CPU コアの数 (2 の倍数)。

spark.driver.memory ドライバープロセス (SparkContext が初期化されている場合) に使用するメモリの量 (例: 1g、2g)。

設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。ただし、Spark ドライバアプリケーションは、マスターインスタンスまたはいずれかのコアインスタンスで (たとえば、YARN クライアントモードとクラスターモードのそれぞれで) 実行されるため、この設定は、これらの 2 つのインスタンスグループ間で、小さい方のインスタンスタイプに基づいて定義されます。

spark.executor.memory エグゼキュータのプロセスごとに使用するメモリの量 (例: 1g、2g)

設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。

spark.executor.cores 各エグゼキュータに使用するコアの数。 設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。
spark.executor.instances エグゼキュータの数。

設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。同時に spark.dynamicAllocation.enabled が明示的に true に設定されていない限り設定されます。

エグゼキュータの動的割り当ての有効化

YARN の Spark は、Spark アプリケーションに使用されるエグゼキュータの数を動的に増減することができます。Amazon EMR リリース 4.4.0 以降のバージョンでは、動的割り当てがデフォルトで有効になっています。詳細については Spark ドキュメントの「Dynamic Resource Allocation」と「Dynamic Allocation」のプロパティをご覧ください。

ノード停止の動作設定

Amazon EMR リリースバージョン 5.9.0 以降を使用する場合、Amazon EMR の Spark には、手動によるサイズ変更や自動のスケーリングポリシーの要求によるノードの停止を、Spark が適切に処理できるようにするための、機能のセットが含まれています。Amazon EMR では Spark にブラックリストメカニズムを実装しています。これは YARN の停止メカニズム上の構築されています。このメカニズムにより、停止中のノードで新しいタスクがスケジュールされないようにし、同時に、既に実行中のタスクを完了するようにします。さらに、ノードの終了時にシャッフルブロックが失われた場合に、Spark のジョブを迅速に回復するための機能があります。再計算プロセスはすぐに実行され、より少ないステージの再試行でより迅速に再計算を行うように最適化されます。シャッフルブロックが見つからないことにより発生するフェッチのエラーによるジョブの失敗を避けることができます。

重要

Amazon EMR リリースバージョン 5.11.0 に spark.decommissioning.timeout.threshold 設定が追加され、スポットインスタンスを使用する場合の Spark の耐障害性が向上しました。これまでのリリースバージョンでは、ノードがスポットインスタンスを使用していて、インスタンスが入札価格のために終了した場合、Spark は終了を適切に処理できない場合があります。ジョブが失敗する場合があり、シャッフルの再計算に長い時間がかかる場合があります。このため、スポットインスタンスを使用する場合は、リリースバージョン 5.11.0 以降を使用することをお勧めします。

Spark のノード停止の設定

設定 説明 デフォルト値

spark.blacklist.decommissioning.enabled

true に設定すると、Spark は、YARN で decommissioning 状態にあるノードのブラックリストを作成します。Spark はそのノードで実行されるエグゼキュターに新しいタスクをスケジュールしません。既に実行中のタスクは完了することができます。

true

spark.blacklist.decommissioning.timeout

ノードが decommissioning 状態にある時間がブラックリストに載せられます。デフォルトではこの値は 1 時間に設定されています。これは yarn.resourcemanager.decomissioning.timeout のデフォルトと同じです。ノードが停止期間全体にわたってブラックリストに載せられるようにするには、この値を yarn.resourcemanager.decommissioning.timeout 以上に設定します。停止のタイムアウトの期限が切れた後、ノードは decommissioned 状態に移行し、Amazon EMR はノードの EC2 インスタンスを終了することができます。タイムアウトの期限が切れた後も実行しているタスクがある場合、それらのタスクは失われるか、または強制終了され、他のノードで実行されているエグゼキュターで再スケジュールされます。

1h

spark.decommissioning.timeout.threshold

Amazon EMR リリースバージョン 5.11.0 以降で使用できます。指定は秒単位です。ノードを廃棄状態に移行する際、ホストがこの値以下の使用停止時間内に廃棄される場合、Amazon EMR はノードをブラックリストに載せるだけではなく、ノードが廃棄状態に移行するのを待たずに、ホストの状態をクリーンアップ (spark.resourceManager.cleanupExpiredHost の指定による) します。これにより、スポットインスタンスが yarn.resourcemager.decommissioning.timeout の値にかかわらず 20 秒以内にタイムアウトするため、Spark がスポットインスタンスの終了をより良く処理できます。ただし、他のノードがシャッフルファイルを読み取るための時間が足りない場合があります。

20s

spark.resourceManager.cleanupExpiredHost

true に設定すると、Spark は、decommissioned 状態にあるノードのエグゼキュターに保存されているすべてのキャッシュデータとシャッフルブロックの登録を解除します。これにより、復旧プロセスが高速化されます。

true

spark.stage.attempt.ignoreOnDecommissionFetchFailure

true に設定すると、Spark でのステージの失敗と、停止ノードでの多数のフェッチの失敗によるジョブの失敗を回避できます。decommissioned 状態にあるノードのシャッフルブロックのフェッチの失敗は、連続するフェッチのエラーの最大数にカウントされません。

true

Spark の ThriftServer 環境変数

Spark では、Hive Thrift Server Port 環境変数である HIVE_SERVER2_THRIFT_PORT が 10001 に設定されます。

Spark のデフォルト設定の変更

クラスターを作成する場合に spark-defaults.conf 設定分類を使用して、spark-defaults 内のデフォルトを変更することができます。maximizeResourceAllocation 設定分類の spark 設定を使用することもできます。

次の手順では、CLI またはコンソールを使用して設定を変更する方法を示します。

CLI を使用して、spark.executor.memory が 2G に設定されたクラスターを作成するには

  • Amazon S3 に格納されている myConfig.json ファイルを参照する次のコマンドを使用して、Spark がインストールされ、spark.executor.memory が 2G に設定されたクラスターを作成します。

    aws emr create-cluster --release-label emr-5.14.0 --applications Name=Spark \ --instance-type m4.large --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json

    注記

    読みやすくするため、Linux の行連結文字 (\) を含めています。Linux コマンドでは、これらの文字は削除することも、使用することもできます。Windows の場合は、削除するか、キャレット (^) で置き換えます。

    myConfig.json:

    [ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]

コンソールを使用して、spark.executor.memory が 2G に設定されたクラスターを作成するには

  1. Amazon EMR コンソール (https://console.aws.amazon.com/elasticmapreduce/) を開きます。

  2. Choose Create cluster, Go to advanced options.

  3. [Spark] を選択します。

  4. [Edit software settings] (ソフトウェア設定の編集) で、[Enter configuration] (設定の入力) を選択したままにしておき、次の設定を入力します。

    classification=spark-defaults,properties=[spark.executor.memory=2G]
  5. 他のオプションを選択し、[] を選択して、[Create cluster] (クラスターの作成) を選択します。

maximizeResourceAllocation を設定するには

  • Amazon S3 に格納されている myConfig.json ファイルを参照して、Spark がインストールされ、maximizeResourceAllocation が true に設定されたクラスターを AWS CLI を使用して作成します。

    aws emr create-cluster --release-label emr-5.14.0 --applications Name=Spark \ --instance-type m4.large --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json

    注記

    読みやすくするため、Linux の行連結文字 (\) を含めています。Linux コマンドでは、これらの文字は削除することも、使用することもできます。Windows の場合は、削除するか、キャレット (^) で置き換えます。

    myConfig.json:

    [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]