Spark 結果フラグメントキャッシュ - Amazon EMR

Spark 結果フラグメントキャッシュ

Amazon EMR 6.6.0 以降には、結果フラグメントを自動的にキャッシュするオプションの Spark 結果フラグメントキャッシュ機能が含まれています。これらの結果フラグメントは、選択した Amazon S3 バケットに保存されているクエリのサブツリーからの結果の一部です。保存されたクエリの結果フラグメントは、その後のクエリ実行で再利用されるため、クエリが高速になります。

結果フラグメントキャッシュは、Spark SQL クエリを分析し、対象となる結果フラグメントを指定した S3 の場所にキャッシュすることで機能します。それ以降のクエリ実行では、使用可能なクエリの結果フラグメントが自動的に検出され、S3 から取得されます。結果フラグメントキャッシュは結果セットキャッシュとは異なります。結果セットキャッシュでは、それ以降のクエリがキャッシュから結果を返すには元のクエリと完全に一致する必要があります。結果フラグメントキャッシュは、データの静的なサブセットを繰り返しターゲットとするクエリに使用すると、パフォーマンスが大幅に向上します。

次のクエリを考えてみましょう。このクエリでは、2022 年までの注文がカウントされます。

select l_returnflag, l_linestatus, count(*) as count_order from lineitem where l_shipdate <= current_date and year(l_shipdate) == '2022' group by l_returnflag, l_linestatus

時間の経過に従い、このクエリを毎日実行して、その年の総売上をレポートする必要があります。結果フラグメントキャッシュを使用しないと、1 年のすべての日の結果を毎日再計算する必要があります。クエリは時間が経つにつれて遅くなり、365 日分の結果をすべて再計算する必要がある年末に最も遅くなります。

結果フラグメントキャッシュを有効にすると、キャッシュから 1 年のうちのそれ以前のすべての日の結果を使用します。毎日、この機能が再計算する必要があるのは 1 日分の結果のみです。この機能は結果フラグメントを計算した後で、フラグメントをキャッシュします。その結果、キャッシュを有効にしたクエリ時間は短く、それ以降の各クエリで一定に保たれます。

Spark 結果フラグメントキャッシュを有効にする

Spark 結果フラグメントキャッシュを有効にするには、以下の手順を実行します。

  1. Amazon S3 にキャッシュバケットを作成し、EMRFS の読み取り/書き込みアクセスを許可します。詳細については、「Amazon S3 内の EMRFS データへのアクセスを許可する」を参照してください。

  2. EMR Spark 設定を行い、この機能を有効にします。

    spark.subResultCache.enabled = true spark.subResultCache.fs.root.path = s3://DOC-EXAMPLE-BUCKET/cache_dir/
  3. バケットの S3 ライフサイクル管理を有効にして、キャッシュファイルを自動的にクリーンアップします。

  4. オプションで reductionRationThreshold プロパティと maxBufferSize プロパティを設定して、この機能をさらに調整できます。

    spark.sql.subResultCache.reductionRatioThreshold spark.sql.subResultCache.maxBufferSize

結果フラグメントキャッシュを使用するときの考慮事項

再計算するのではなく、Amazon S3 に既にキャッシュされている結果を使用する場合のコスト削減は、同じキャッシュ結果を使用できる回数に応じて大きくなります。大きなテーブルスキャンの後にフィルターやハッシュ集計を行うクエリでは、結果のサイズを 8 倍以上減らすことができれば (つまり、入力サイズ:結果の比率が 8:1 以上)、この機能のメリットは最も大きくなります。入力と結果の削減率が大きいほど、コスト面でのメリットも大きくなります。結果を生成するコストが Amazon S3 から結果を取得するコストよりも大きい限り、削減率は小さくても、テーブルスキャンとフィルターまたは集計の間に高価な計算ステップが含まれるクエリにもメリットがあります。デフォルトでは、結果フラグメントキャッシュは、削減率が少なくとも 8:1 になると検出された場合にのみ有効になります。

キャッシュされた結果をクエリが繰り返し再利用する場合、この機能のメリットが最も大きくなります。ローリングウィンドウクエリや増分ウィンドウクエリが良い例です。例えば、30 日間のローリングウィンドウクエリを既に 29 日間実行した場合、元の入力ソースからターゲットデータの 30 分の 1 を取得するだけで、過去 29 日間のキャッシュされた結果フラグメントが使用されます。増分ウィンドウクエリは、ウィンドウの開始が固定されているため、さらにメリットがあります。クエリを呼び出すたびに、入力ソースからの読み取りが必要になる処理の割合は小さくなります。

結果フラグメントキャッシュを使用する場合のその他の考慮事項は次のとおりです。

  • 同じクエリフラグメントで、同じデータを対象としないクエリは、キャッシュヒットレートが低くなるため、この機能のメリットはありません。

  • 削減率が低く、コストのかかる計算ステップを含まないクエリでは、結果がキャッシュされ、最初に処理したときとほぼ同じくらいの読み取りコストが発生します。

  • 最初のクエリでは、キャッシュへの書き込みコストがかかるため、常に軽微なリグレッションが発生します。

  • 結果フラグメントキャッシュ機能は Parquet ファイルでのみ機能します。その他のフェイル形式はサポートされていません。

  • 結果フラグメントキャッシュ機能のバッファは、ファイル分割サイズが 128 MB 以上のスキャンのキャッシュのみを試みます。デフォルトの Spark 構成では、スキャンサイズ (スキャン対象のすべてのファイルの合計サイズ) をエグゼキュターコアの数で割った値が 128 MB 未満の場合、結果フラグメントキャッシュは無効になります。以下に示す Spark 構成のいずれかが設定されている場合、ファイル分割サイズは次のようになります。

    min(maxPartitionBytes, max(openCostInBytes, scan size / minPartitionNum))
    • spark.sql.leafNodeDefaultParallelism (デフォルト値は spark.default.parallelism)

    • spark.sql.files.minPartitionNum (デフォルト値は spark.sql.leafNodeDefaultParallelism)

    • spark.sql.files.openCostInBytes

    • spark.sql.files.maxPartitionBytes

  • 結果フラグメントキャッシュ機能は RDD パーティションの詳細度でキャッシュします。前述の削減率 (デフォルトは 8:1) は、RDD パーティションごとに評価されます。RDD あたりの削減率が 8:1 より大きいワークロードと 8:1 未満のワークロードの両方が含まれる場合、RDD あたりの削減率が常に 8:1 未満のワークロードよりも、パフォーマンス上のメリットが小さくなることがあります。

  • 結果フラグメントキャッシュ機能は、キャッシュされる各 RDD パーティションにデフォルトで 16 MB の書き込みバッファを使用します。RDD パーティションごとに 16 MB を超える容量がキャッシュされる場合、書き込みが不可能であると判断するコストにより、パフォーマンスが低下する可能性があります。

  • デフォルトでは、結果フラグメントキャッシュは 8:1 未満の削減率では RDD パーティションの結果をキャッシュしようとせず、書き込みバッファを 16 MB に制限しますが、これらの値は両方とも以下の設定で調整できます。

    spark.sql.subResultCache.reductionRatioThreshold (default: 8.0) spark.sql.subResultCache.maxBufferSize (default: 16MB, max: 64MB)
  • 同じ EMR リリースを使用する複数のクラスターは、同じキャッシュ場所を共有できます。結果の正確性を確保するため、結果フラグメントキャッシュでは、異なるリリースの Amazon EMR で書き込まれたキャッシュ結果は使用されません。

  • 結果フラグメントキャッシュは、Spark ストリーミングのユースケースや RecordServer、Apache Ranger、または AWS Lake Formation が使用されている場合は自動的に無効になります。

  • 結果フラグメントキャッシュの読み取り/書き込みでは、EMRFS と Amazon S3 バケットが使用されます。CSE/SSE S3/SSE KMS 暗号化がサポートされています。