Bekerja dengan tabel Apache Iceberg dengan menggunakan Apache Spark - AWS Panduan Preskriptif

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Bekerja dengan tabel Apache Iceberg dengan menggunakan Apache Spark

Bagian ini memberikan gambaran umum tentang penggunaan Apache Spark untuk berinteraksi dengan tabel Iceberg. Contohnya adalah kode boilerplate yang dapat berjalan di Amazon EMR atau. AWS Glue

Catatan: Antarmuka utama untuk berinteraksi dengan tabel Iceberg adalah SQL, sehingga sebagian besar contoh akan menggabungkan Spark SQL dengan API. DataFrames

Membuat dan menulis tabel Iceberg

Anda dapat menggunakan Spark SQL dan Spark DataFrames untuk membuat dan menambahkan data ke tabel Iceberg.

Menggunakan Spark SQL

Untuk menulis dataset Iceberg, gunakan pernyataan SQL Spark standar seperti dan. CREATE TABLE INSERT INTO

Tabel yang tidak dipartisi

Berikut adalah contoh membuat tabel Iceberg yang tidak dipartisi dengan Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)

Untuk menyisipkan data ke dalam tabel yang tidak dipartisi, gunakan pernyataan standar: INSERT INTO

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)

Tabel yang dipartisi

Berikut adalah contoh membuat tabel Iceberg yang dipartisi dengan Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)

Untuk menyisipkan data ke dalam tabel Iceberg yang dipartisi dengan Spark SQL, Anda melakukan pengurutan global dan kemudian menulis data:

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)

Menggunakan DataFrames API

Untuk menulis kumpulan data Iceberg, Anda dapat menggunakan API. DataFrameWriterV2

Untuk membuat tabel Iceberg dan menulis data untuk itu, gunakan fungsi df.writeTo( t). Jika tabel ada, gunakan .append() fungsi. Jika tidak, gunakan Contoh .create(). berikut digunakan.createOrReplace(), yang merupakan variasi .create() yang setara denganCREATE OR REPLACE TABLE AS SELECT.

Tabel yang tidak dipartisi

Untuk membuat dan mengisi tabel Iceberg yang tidak dipartisi dengan menggunakan API: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()

Untuk menyisipkan data ke dalam tabel Iceberg tak terpartisi yang sudah ada dengan menggunakan API: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()

Tabel yang dipartisi

Untuk membuat dan mengisi tabel Iceberg yang dipartisi menggunakan DataFrameWriterV2 API, Anda dapat menggunakan pengurutan lokal untuk menyerap data:

input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()

Untuk menyisipkan data ke dalam tabel Iceberg yang dipartisi menggunakan DataFrameWriterV2 API, Anda dapat menggunakan pengurutan global untuk menyerap data:

input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()

Memperbarui data dalam tabel Iceberg

Contoh berikut menunjukkan cara memperbarui data dalam tabel Iceberg. Contoh ini memodifikasi semua baris yang memiliki nomor genap di c_customer_sk kolom.

spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)

Operasi ini menggunakan copy-on-write strategi default, sehingga menulis ulang semua file data yang terkena dampak.

Meningkatkan data dalam tabel Iceberg

Upserting data mengacu pada memasukkan catatan data baru dan memperbarui catatan data yang ada dalam satu transaksi. Untuk meningkatkan data ke dalam tabel Iceberg, Anda menggunakan pernyataan tersebut. SQL MERGE INTO 

Contoh berikut meningkatkan isi tabel{UPSERT_TABLE_NAME} di dalam tabel: {TABLE_NAME}

spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
  • Jika catatan pelanggan yang {UPSERT_TABLE_NAME} sudah ada di dalam {TABLE_NAME} dengan yang samac_customer_id, c_email_address nilai {UPSERT_TABLE_NAME} rekaman akan menggantikan nilai yang ada (operasi pembaruan).

  • Jika catatan pelanggan yang masuk {UPSERT_TABLE_NAME} tidak ada di{TABLE_NAME}, {UPSERT_TABLE_NAME} catatan ditambahkan ke {TABLE_NAME} (operasi sisipkan).

Menghapus data dalam tabel Iceberg

Untuk menghapus data dari tabel Iceberg, gunakan DELETE FROM ekspresi dan tentukan filter yang cocok dengan baris yang akan dihapus.

spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)

Jika filter cocok dengan seluruh partisi, Iceberg melakukan penghapusan metadata saja dan membiarkan file data di tempatnya. Jika tidak, ia hanya menulis ulang file data yang terpengaruh.

Metode delete mengambil file data yang dipengaruhi oleh WHERE klausa dan membuat salinannya tanpa catatan yang dihapus. Kemudian membuat snapshot tabel baru yang menunjuk ke file data baru. Oleh karena itu, catatan yang dihapus masih ada di snapshot tabel yang lebih lama. Misalnya, jika Anda mengambil snapshot sebelumnya dari tabel, Anda akan melihat data yang baru saja Anda hapus. Untuk informasi tentang menghapus snapshot lama yang tidak dibutuhkan dengan file data terkait untuk tujuan pembersihan, lihat bagian Memelihara file dengan menggunakan pemadatan nanti dalam panduan ini.

Membaca data

Anda dapat membaca status terbaru dari tabel Iceberg Anda di Spark dengan Spark SQL dan. DataFrames 

Contoh menggunakan Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)

Contoh menggunakan DataFrames API:

df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)

Menggunakan perjalanan waktu

Setiap operasi tulis (menyisipkan, memperbarui, meningkatkan, menghapus) dalam tabel Iceberg membuat snapshot baru. Anda kemudian dapat menggunakan snapshot ini untuk perjalanan waktu — untuk kembali ke masa lalu dan memeriksa status tabel di masa lalu.

Untuk informasi tentang cara mengambil riwayat snapshot untuk tabel dengan menggunakan snapshot-id dan nilai waktu, lihat bagian Mengakses metadata nanti dalam panduan ini.

Kueri perjalanan waktu berikut menampilkan status tabel berdasarkan spesifiksnapshot-id.

Menggunakan Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)

Menggunakan DataFrames API:

df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

Kueri perjalanan waktu berikut menampilkan status tabel berdasarkan snapshot terakhir yang dibuat sebelum stempel waktu tertentu, dalam milidetik (). as-of-timestamp

Menggunakan Spark SQL:

spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)

Menggunakan DataFrames API:

df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

Menggunakan kueri inkremental

Anda juga dapat menggunakan snapshot Iceberg untuk membaca data yang ditambahkan secara bertahap. 

Catatan: Saat ini, operasi ini mendukung pembacaan data dari append snapshot. Itu tidak mendukung pengambilan data dari operasi sepertireplace,overwrite, ataudelete.  Selain itu, operasi baca tambahan tidak didukung dalam sintaks Spark SQL.

Contoh berikut mengambil semua catatan ditambahkan ke tabel Iceberg antara snapshot start-snapshot-id (eksklusif) dan (inklusif). end-snapshot-id

df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )

Mengakses metadata

Iceberg menyediakan akses ke metadata-nya melalui SQL. Anda dapat mengakses metadata untuk setiap tabel yang diberikan (<table_name>) dengan menanyakan namespace. <table_name>.<metadata_table> Untuk daftar lengkap tabel metadata, lihat Memeriksa tabel dalam dokumentasi Gunung Es.

Contoh berikut menunjukkan cara mengakses tabel metadata sejarah Gunung Es, yang menunjukkan riwayat komit (perubahan) untuk tabel Gunung Es. 

Menggunakan Spark SQL (dengan %%sql keajaiban) dari notebook Amazon EMR Studio:

Spark.sql(f“”” SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)

Menggunakan DataFrames API:

spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)

Contoh output:

Contoh keluaran metadata dari tabel Gunung Es