Spark の設定 - Amazon EMR

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

Spark の設定

Amazon EMR 上の Spark を設定するには、設定分類を使用します。設定分類の使用方法の詳細については「アプリケーションの設定」を参照してください。

Amazon EMR の Spark の設定分類には次が含まれます。

  • sparkmaximizeResourceAllocation プロパティを true または false に設定します。true の場合、Amazon EMR はクラスターハードウェア設定に基づいて、自動的に spark-defaults プロパティを設定します。詳細については、「の使用maximizeResourceAllocation」を参照してください。

  • spark-defaultsspark-defaults.conf ファイルで値を設定します。詳細については、Spark ドキュメントの「Spark Configuration」を参照してください。

  • spark-envspark-env.sh ファイルで値を設定します。詳細については、Spark ドキュメントの「Environment Variables」を参照してください。

  • spark-hive-site—Spark の hive-site.xml で値を設定します。

  • spark-log4jlog4j.properties ファイルで値を設定します。設定と詳細については Github の log4j.properties.template ファイルをご覧ください。

  • spark-metricsmetrics.properties ファイルで値を設定します。設定と詳細については Github の metrics.properties.template ファイルと Spark ドキュメントの Metrics をご覧ください。

Amazon EMR で設定される Spark のデフォルト

次の表はアプリケーションに影響する spark-defaults で Amazon EMR がどのようにデフォルト値を設定するか示しています。

Amazon EMR で設定される Spark のデフォルト
設定 説明
spark.executor.memory エグゼキュターのプロセスごとに使用するメモリの量 (例: 1g、2g)

設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。

spark.executor.cores 各エグゼキュターに使用するコアの数。 設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。
spark.dynamicAllocation.enabled 動的なリソース割り当てを使用するかどうか。動的なリソース割り当てでは、ワークロードによって、アプリケーションで登録するエグゼキュターの数を調整します。

true (emr-4.4.0 以上)

注記

Spark Shuffle Service は Amazon EMR によって自動的に設定されます。

6.1.0 での Spark ガベージコレクションの設定Amazon EMR

および spark.driver.extraJavaOptions でカスタムガベージコレクション設定を設定すると、spark.executor.extraJavaOptions 6.1.0 でガベージコレクション設定が競合するために、Amazon EMR 6.1 でドライバーまたはエグゼキュターの起動に失敗します。Amazon EMR6.1.0 の場合、デフォルトのガベージコレクション設定は Amazon EMR および spark.driver.defaultJavaOptions を通じて設定されます。spark.executor.defaultJavaOptions この設定は Amazon EMR 6.1.0 にのみ適用されます。ログ記録の設定用など、ガベージコレクションに関連しない JVM オプション (-verbose:class) は、引き続き extraJavaOptions で設定できます。 詳細については、「Spark アプリケーションのプロパティ」を参照してください。

の使用maximizeResourceAllocation

クラスター内の各ノードでリソース最大限に使用できるようにエグゼキュターを設定するには、spark 設定分類を使用して maximizeResourceAllocation オプションを true に設定します。この EMR 固有のオプションは、コアインスタンスグループのインスタンスのエグゼキュターで利用可能な最大のコンピューティングとメモリリソースを計算します。次に、この情報に基づいて対応する spark-defaults が設定されます。

[ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
設定は spark-defaults で設定されます (maximizeResourceAllocation が有効になっている場合)
設定 説明
spark.default.parallelism ユーザーによって設定されていない場合に、join、RDDs、parallelize などの変換によって返される reduceByKey のパーティションのデフォルト数。

YARN コンテナで使用可能な CPU コアの数 (2 の倍数)。

spark.driver.memory ドライバープロセス (例: SparkContext が初期化されている場所) に使用するメモリの量 (例: 1g、2g)。

設定は、クラスター内のスレーブインスタンスタイプに基づいて定義されます。ただし、Spark ドライバーアプリケーションは、マスターインスタンスまたはいずれかのコアインスタンスで (たとえば、YARN クライアントモードとクラスターモードのそれぞれで) 実行されるため、この設定は、これらの 2 つのインスタンスグループ間で、小さい方のインスタンスタイプに基づいて定義されます。

spark.executor.memory エグゼキュターのプロセスごとに使用するメモリの量 (例: 1g、2g)

設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。

spark.executor.cores 各エグゼキュターに使用するコアの数。 設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。
spark.executor.instances エグゼキュターの数。

設定は、クラスター内のコアおよびタスクインスタンスタイプに基づいて行われます。同時に spark.dynamicAllocation.enabled が明示的に true に設定されていない限り設定されます。

エグゼキュターの動的割り当ての有効化

YARN の Spark は、Spark アプリケーションに使用されるエグゼキュターの数を動的に増減することができます。Amazon EMR リリース 4.4.0 以降のバージョンでは、動的割り当てがデフォルトで有効になっています。詳細については Spark ドキュメントの「Dynamic Resource Allocation」と「Dynamic Allocation」のプロパティをご覧ください。

ノード停止の動作設定

Amazon EMR リリースバージョン 5.9.0 以降を使用する場合、Amazon EMR での Spark には、手動によるサイズ変更や自動スケーリングポリシーの要求によるノードの停止を Spark が適切に処理できるようにするための一連の機能が含まれています。Amazon EMR は Spark にブラックリストメカニズムを実装しています。これは YARN の停止メカニズム上に構築されています。このメカニズムにより、停止中のノードで新しいタスクがスケジュールされないようにし、同時に、既に実行中のタスクを完了するようにします。さらに、ノードの終了時にシャッフルブロックが失われた場合に、Spark のジョブを迅速に回復するための機能があります。再計算プロセスはすぐに実行され、より少ないステージの再試行でより迅速に再計算を行うように最適化されます。シャッフルブロックが見つからないことにより発生するフェッチのエラーによるジョブの失敗を避けることができます。

重要

Amazon EMR リリースバージョン 5.11.0 に spark.decommissioning.timeout.threshold 設定が追加され、スポットインスタンスを使用する場合の Spark の耐障害性が向上しました。これまでのリリースバージョンでは、ノードがスポットインスタンスを使用していて、インスタンスが入札価格のために終了した場合、Spark は終了を適切に処理できない場合があります。ジョブが失敗する場合があり、シャッフルの再計算に長い時間がかかる場合があります。このため、スポットインスタンスを使用する場合は、リリースバージョン 5.11.0 以降を使用することをお勧めします。

Spark のノード停止の設定
設定 説明 デフォルト値

spark.blacklist.decommissioning.enabled

true に設定すると、Spark は、YARN で decommissioning 状態にあるノードのブラックリストを作成します。Spark はそのノードで実行されるエグゼキュターに新しいタスクをスケジュールしません。既に実行中のタスクは完了することができます。

true

spark.blacklist.decommissioning.timeout

ノードが decommissioning 状態にある時間がブラックリストに載せられます。デフォルトでは、この値は 1 時間に設定されています。これは yarn.resourcemanager.decommissioning.timeout のデフォルトと同じです。 ノードが停止期間全体にわたってブラックリストに載せられるようにするには、この値を yarn.resourcemanager.decommissioning.timeout 以上に設定します。 停止のタイムアウトが期限切れになると、ノードは decommissioned 状態に移行し、Amazon EMR はノードの EC2 インスタンスを終了できます。タイムアウトの期限が切れた後も実行しているタスクがある場合、それらのタスクは失われるか、または強制終了され、他のノードで実行されているエグゼキュターで再スケジュールされます。

1h

spark.decommissioning.timeout.threshold

Amazon EMR リリースバージョン 5.11.0 以降で使用できます。指定は秒単位です。ノードが停止状態に移行する際、ホストがこの値以下の使用停止時間内に停止される場合、Amazon EMR はノードをブラックリストに追加するだけではなく、ノードが停止状態に移行するのを待たずに、ホストの状態を (spark.resourceManager.cleanupExpiredHost の指定に従って) クリーンアップします。これにより、スポットインスタンスが yarn.resourcemager.decommissioning.timeout の値にかかわらず 20 秒以内にタイムアウトするため、Spark がスポットインスタンスの終了をより良く処理できます。ただし、他のノードがシャッフルファイルを読み取るための時間が足りない場合があります。

20s

spark.resourceManager.cleanupExpiredHost

true に設定すると、Spark は、decommissioned 状態にあるノードのエグゼキュターに保存されているすべてのキャッシュデータとシャッフルブロックの登録を解除します。これにより、復旧プロセスが高速化されます。

true

spark.stage.attempt.ignoreOnDecommissionFetchFailure

true に設定すると、Spark でのステージの失敗と、停止ノードでの多数のフェッチの失敗によるジョブの失敗を回避できます。decommissioned 状態にあるノードのシャッフルブロックのフェッチの失敗は、連続するフェッチのエラーの最大数にカウントされません。

true

Spark ThriftServer 環境変数

Spark では、Hive Thrift Server Port 環境変数である HIVE_SERVER2_THRIFT_PORT が 10001 に設定されます。

Spark のデフォルト設定の変更

spark-defaults 設定分類を使用して、spark-defaults.conf 内のデフォルトを変更するか、spark 設定分類の maximizeResourceAllocation 設定を使用します。

次の手順では、CLI またはコンソールを使用して設定を変更する方法を示します。

CLI を使用して、spark.executor.memory が 2g に設定されたクラスターを作成するには

  • に格納されている spark.executor.memory ファイルを参照する次のコマンドを使用して、Spark がインストールされ、 が 2g に設定されたクラスターを作成します。myConfig.jsonAmazon S3

    aws emr create-cluster --release-label emr-5.31.0 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    注記

    読みやすくするため、Linux の行連結文字 (\) を含めています。Linux コマンドでは、これらの文字は削除することも、使用することもできます。Windows の場合は、削除するか、キャレット (^) で置き換えます。

    myConfig.json:

    [ { "Classification": "spark-defaults", "Properties": { "spark.executor.memory": "2G" } } ]

コンソールを使用して、spark.executor.memory が 2g に設定されたクラスターを作成するには

  1. Amazon EMR コンソール (https://console.aws.amazon.com/elasticmapreduce/) を開きます。

  2. [Create cluster (クラスターの作成)]、[Go to advanced options (詳細オプションに移動する)] の順に選択します。

  3. [Spark] を選択します。

  4. [Edit software settings] (ソフトウェア設定の編集) で、[Enter configuration] (設定の入力) を選択したままにしておき、次の設定を入力します。

    classification=spark-defaults,properties=[spark.executor.memory=2G]
  5. 他のオプションを選択し、[] を選択して、[Create cluster] (クラスターの作成) を選択します。

を設定するにはmaximizeResourceAllocation

  • Amazon S3 に格納されている myConfig.json ファイルを参照して、Spark がインストールされ、maximizeResourceAllocation が true に設定されたクラスターを AWS CLI を使用して作成します。

    aws emr create-cluster --release-label emr-5.31.0 --applications Name=Spark \ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole --configurations https://s3.amazonaws.com/mybucket/myfolder/myConfig.json
    注記

    読みやすくするため、Linux の行連結文字 (\) を含めています。Linux コマンドでは、これらの文字は削除することも、使用することもできます。Windows の場合は、削除するか、キャレット (^) で置き換えます。

    myConfig.json:

    [ { "Classification": "spark", "Properties": { "maximizeResourceAllocation": "true" } } ]
注記

Amazon EMR バージョン 5.21.0 以降では、実行中のクラスター内のインスタンスグループごとに、クラスタ設定を上書きして追加の設定分類を指定できます。これを行うには、Amazon EMR コンソール、AWS Command Line Interface (AWS CLI)、または AWS SDK を使用します。詳細については、「実行中のクラスター内のインスタンスグループの設定を指定する」を参照してください。