書き込みパフォーマンスの最適化 - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

書き込みパフォーマンスの最適化

このセクションでは、エンジンに関係なく Iceberg テーブルの書き込みパフォーマンスを最適化するために調整できるテーブルプロパティについて説明します。

テーブル分散モードを設定する

Iceberg には、Spark タスク間での書き込みデータの分散方法を定義する複数の書き込み分散モードが用意されています。使用可能なモードの概要については、Iceberg ドキュメントの「Writing Distribution Modes」を参照してください。

特にストリーミングワークロードで書き込み速度を優先するユースケースでは、 write.distribution-modeを に設定しますnone。これにより、Iceberg は追加の Spark シャッフルをリクエストせず、Spark タスクで利用可能になるとデータが書き込まれます。このモードは、Spark 構造化ストリーミングアプリケーションに特に適しています。

注記

書き込みディストリビューションモードを に設定すると、多数の小さなファイルが生成されるnone傾向があり、読み取りパフォーマンスが低下します。これらの小さなファイルを適切なサイズのファイルに統合してクエリのパフォーマンスを向上させるには、定期的に圧縮することをお勧めします。

適切な更新戦略を選択する

最新のデータの読み込みオペレーションが遅いことがユースケースで許容できる場合は、 merge-on-read 戦略を使用して書き込みパフォーマンスを最適化します。

を使用すると merge-on-read、Iceberg は更新を書き込み、個別の小さなファイルとしてストレージを削除します。テーブルが読み取られると、リーダーはこれらの変更をベースファイルとマージして、データの最新のビューを返す必要があります。これにより、読み取りオペレーションのパフォーマンスが損なわれますが、更新と削除の書き込みが高速化されます。通常、 merge-on-read は、多数のテーブルパーティションに分散された更新プログラムや少数の更新ジョブを含むストリーミングワークロードに最適です。

merge-on-read 設定 (write.update.modewrite.delete.mode、および write.merge.mode) は、テーブルレベルで設定することも、アプリケーション側で個別に設定することもできます。

を使用するには、読み込みパフォーマンスが時間の経過とともに低下するのを防ぐために、定期的な圧縮を実行 merge-on-read する必要があります。圧縮は、更新と削除を既存のデータファイルと照合して新しいデータファイルセットを作成するため、読み取り側で発生するパフォーマンス上のペナルティを排除します。デフォルトでは、プロパティのデフォルトをよりdelete-file-threshold小さい値に変更しない限り、Iceberg の圧縮は削除ファイルをマージしません (Iceberg ドキュメント を参照)。圧縮の詳細については、このガイドの後半にある「Iceberg 圧縮」セクションを参照してください。

適切なファイル形式を選択する

Iceberg は、Parquet、ORC、および Avro 形式でのデータの書き込みをサポートしています。Parquet はデフォルトの形式です。Parquet と ORC は、優れた読み取りパフォーマンスを提供する列指向形式ですが、通常は書き込みが遅くなります。これは、読み取りパフォーマンスと書き込みパフォーマンスの一般的なトレードオフを表します。

ストリーミングワークロードなど、ユースケースで書き込み速度が重要な場合は、ライターのオプションで write-formatを に設定して Avro 形式でAvro書き込むことを検討してください。Avro は行ベースの形式であるため、書き込み時間が短縮され、読み取りパフォーマンスが低下します。

読み取りパフォーマンスを向上させるには、通常の圧縮を実行して小さな Avro ファイルをマージし、大きな Parquet ファイルに変換します。圧縮プロセスの結果は、write.format.defaultテーブル設定によって管理されます。Iceberg のデフォルトの形式は Parquet であるため、Avro で書き込み、圧縮を実行すると、Iceberg は Avro ファイルを Parquet ファイルに変換します。例を示します。

spark.sql(f""" CREATE TABLE IF NOT EXISTS glue_catalog.{DB_NAME}.{TABLE_NAME} ( Col_1 float, <<<…other columns…>> ts timestamp) USING iceberg PARTITIONED BY (days(ts)) OPTIONS ( 'format-version'='2', write.format.default'=parquet) """) query = df \ .writeStream \ .format("iceberg") \ .option("write-format", "avro") \ .outputMode("append") \ .trigger(processingTime='60 seconds') \ .option("path", f"glue_catalog.{DB_NAME}.{TABLE_NAME}") \ .option("checkpointLocation", f"s3://{BUCKET_NAME}/checkpoints/iceberg/") .start()