Apache Spark を使用した Apache Iceberg テーブルの操作 - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Spark を使用した Apache Iceberg テーブルの操作

このセクションでは、Apache Spark を使用して Iceberg テーブルを操作する方法の概要を説明します。例は、Amazon EMR または で実行できる定型コードです AWS Glue。

注: Iceberg テーブルを操作するための主なインターフェイスは SQL であるため、ほとんどの例では Spark SQL と DataFrames API が組み合わされます。

Iceberg テーブルの作成と書き込み

Spark SQL と Spark を使用して、データ DataFrames を作成して Iceberg テーブルに追加できます。

Spark SQL の使用

Iceberg データセットを記述するには、 CREATE TABLEや などの標準の Spark SQL ステートメントを使用しますINSERT INTO

パーティション分割されていないテーブル

Spark SQL を使用してパーティション分割されていない Iceberg テーブルを作成する例を次に示します。

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)

パーティション分割されていないテーブルにデータを挿入するには、標準INSERT INTOステートメントを使用します。

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)

パーティションテーブル

Spark SQL でパーティション化された Iceberg テーブルを作成する例を次に示します。

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)

Spark SQL を使用してパーティション化された Iceberg テーブルにデータを挿入するには、グローバルソートを実行してからデータを書き込みます。

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)

DataFrames API の使用

Iceberg データセットを記述するには、 DataFrameWriterV2 API を使用できます。

Iceberg テーブルを作成してデータを書き込むには、df.writeTo(t) 関数を使用します。テーブルが存在する場合は、 .append()関数を使用します。そうでない場合は、.create().次の例で を使用します。これは .createOrReplace()に相当する のバリエーション.create()ですCREATE OR REPLACE TABLE AS SELECT

パーティション分割されていないテーブル

DataFrameWriterV2 API を使用してパーティション分割されていない Iceberg テーブルを作成して入力するには:

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()

DataFrameWriterV2 API を使用して、パーティション分割されていない既存の Iceberg テーブルにデータを挿入するには:

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()

パーティションテーブル

DataFrameWriterV2 API を使用してパーティション化された Iceberg テーブルを作成して入力するには、ローカルソートを使用してデータを取り込むことができます。

input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()

DataFrameWriterV2 API を使用してパーティション化された Iceberg テーブルにデータを挿入するには、グローバルソートを使用してデータを取り込むことができます。

input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()

Iceberg テーブルのデータの更新

次の例は、Iceberg テーブルのデータを更新する方法を示しています。この例では、c_customer_sk列に偶数を持つすべての行を変更します。

spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)

このオペレーションはデフォルトの copy-on-write 戦略を使用するため、影響を受けるすべてのデータファイルを書き換えます。

Iceberg テーブルのデータの更新

データの更新とは、新しいデータレコードを挿入し、既存のデータレコードを 1 つのトランザクションに更新することです。データを Iceberg テーブルにアップサートするには、 SQL MERGE INTOステートメントを使用します。 

次の例では、テーブル 内のテーブル {UPSERT_TABLE_NAME} の内容をアップサートします{TABLE_NAME}

spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
  • にある顧客レコードが同じ {TABLE_NAME}{UPSERT_TABLE_NAME}すでに存在する場合c_customer_id{UPSERT_TABLE_NAME}レコードc_email_address値は既存の値を上書きします (更新オペレーション)。

  • にある顧客レコードが に存在し{UPSERT_TABLE_NAME}ない場合{TABLE_NAME}{UPSERT_TABLE_NAME}レコードは {TABLE_NAME} (オペレーションを挿入) に追加されます。

Iceberg テーブル内のデータの削除

Iceberg テーブルからデータを削除するには、 DELETE FROM式を使用して、削除する行に一致するフィルターを指定します。

spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)

フィルターがパーティション全体と一致する場合、Iceberg はメタデータのみの削除を実行し、データファイルをそのまま残します。それ以外の場合は、影響を受けるデータファイルのみを書き換えます。

delete メソッドは、 WHERE句の影響を受けるデータファイルを取得し、削除されたレコードなしでそれらのコピーを作成します。次に、新しいデータファイルを指す新しいテーブルスナップショットを作成します。したがって、削除されたレコードはテーブルの古いスナップショットにまだ存在します。例えば、テーブルの前のスナップショットを取得すると、先ほど削除したデータが表示されます。クリーンアップの目的で関連データファイルを使用して不要な古いスナップショットを削除する方法については、このガイドの後半の「圧縮を使用してファイルを維持する」セクションを参照してください。

データの読み込み

Spark SQL と の両方で、Spark の Iceberg テーブルの最新ステータスを読み取ることができます DataFrames。 

Spark SQL の使用例:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)

DataFrames API の使用例:

df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)

タイムトラベルの使用

Iceberg テーブル内の書き込みオペレーション (挿入、更新、アップサート、削除) ごとに、新しいスナップショットが作成されます。その後、これらのスナップショットをタイムトラベルに使用できます。タイムトラベルをさかのぼって、過去のテーブルのステータスを確認できます。

snapshot-id とタイミング値を使用してテーブルのスナップショットの履歴を取得する方法については、このガイドの後半にある「メタデータへのアクセス」セクションを参照してください。

次のタイムトラベルクエリは、特定の に基づいてテーブルのステータスを表示しますsnapshot-id

Spark SQL の使用:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)

DataFrames API の使用:

df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

次のタイムトラベルクエリは、特定のタイムスタンプの前に作成された最後のスナップショットに基づいて、テーブルのステータスをミリ秒 () 単位で表示しますas-of-timestamp

Spark SQL の使用:

spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)

DataFrames API の使用:

df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

増分クエリの使用

Iceberg スナップショットを使用して、追加されたデータを増分的に読み取ることもできます。 

注: 現在、このオペレーションはappendスナップショットからのデータの読み取りをサポートしています。、、または などのオペレーションからのデータの取得はサポートされていませんreplaceoverwritedelete。  さらに、増分読み取りオペレーションは Spark SQL 構文ではサポートされていません。

次の例では、スナップショット start-snapshot-id (排他的) と end-snapshot-id (包括的) の間に Iceberg テーブルに追加されたすべてのレコードを取得します。

df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )

メタデータへのアクセス

Iceberg は SQL を介してメタデータへのアクセスを提供します。特定のテーブル (<table_name>) のメタデータにアクセスするには、 という名前空間をクエリします<table_name>.<metadata_table>。メタデータテーブルの完全なリストについては、Iceberg ドキュメントの「テーブルの検査」を参照してください。

次の例は、Iceberg テーブルのコミット (変更) の履歴を示す Iceberg 履歴メタデータテーブルにアクセスする方法を示しています。 

Amazon EMR Studio ノートブックからの Spark SQL ( %%sqlマジックを使用) の使用:

Spark.sql(f“”” SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)

DataFrames API の使用:

spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)

サンプル出力:

Iceberg テーブルからのメタデータ出力例