Working with Apache Iceberg tables by using Apache Spark - AWS Prescriptive Guidance

Working with Apache Iceberg tables by using Apache Spark

This section provides an overview of using Apache Spark to interact with Iceberg tables. The examples are boilerplate code that can run on Amazon EMR or AWS Glue.

Note: The primary interface for interacting with Iceberg tables is SQL, so most of the examples will combine Spark SQL with the DataFrames API.

Creating and writing Iceberg tables

You can use Spark SQL and Spark DataFrames to create and add data to Iceberg tables.

Using Spark SQL

To write an Iceberg dataset, use standard Spark SQL statements such as CREATE TABLE and INSERT INTO.

Unpartitioned tables

Here's an example of creating an unpartitioned Iceberg table with Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)

To insert data into an unpartitioned table, use a standard INSERT INTO statement:

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)

Partitioned tables

Here's an example of creating a partitioned Iceberg table with Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)

To insert data into a partitioned Iceberg table with Spark SQL, you perform a global sort and then write the data:

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)

Using the DataFrames API

To write an Iceberg dataset, you can use the DataFrameWriterV2 API.

To create an Iceberg table and write data to it, use the df.writeTo(t) function. If the table exists, use the .append() function. If it doesn't, use .create(). The following examples use .createOrReplace(), which is a variation of .create() that's equivalent to CREATE OR REPLACE TABLE AS SELECT.

Unpartitioned tables

To create and populate an unpartitioned Iceberg table by using the DataFrameWriterV2 API:

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()

To insert data into an existing unpartitioned Iceberg table by using the DataFrameWriterV2 API:

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()

Partitioned tables

To create and populate a partitioned Iceberg table by using the DataFrameWriterV2 API, you can use a local sort to ingest data:

input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()

To insert data into a partitioned Iceberg table by using the DataFrameWriterV2 API, you can use a global sort to ingest data:

input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()

Updating data in Iceberg tables

The following example shows how to update data in an Iceberg table. This example modifies all rows that have an even number in the c_customer_sk column.

spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)

This operation uses the default copy-on-write strategy, so it rewrites all impacted data files.

Upserting data in Iceberg tables

Upserting data refers to inserting new data records and updating existing data records in a single transaction. To upsert data into an Iceberg table, you use the SQL MERGE INTO statement. 

The following example upserts the content of the table {UPSERT_TABLE_NAME} inside the table {TABLE_NAME}:

spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
  • If a customer record that's in {UPSERT_TABLE_NAME} already exists in {TABLE_NAME} with the same c_customer_id, the {UPSERT_TABLE_NAME} record c_email_address value overrides the existing value (update operation).

  • If a customer record that's in {UPSERT_TABLE_NAME} doesn't exist in {TABLE_NAME}, the {UPSERT_TABLE_NAME} record is added to {TABLE_NAME} (insert operation).

Deleting data in Iceberg tables

To delete data from an Iceberg table, use the DELETE FROM expression and specify a filter that matches the rows to delete.

spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)

If the filter matches an entire partition, Iceberg performs a metadata-only delete and leaves the data files in place. Otherwise, it rewrites only the affected data files.

The delete method takes the data files that are impacted by the WHERE clause and creates a copy of them without the deleted records. It then creates a new table snapshot that points to the new data files. Therefore, the deleted records are still present in the older snapshots of the table. For example, if you retrieve the previous snapshot of the table, you'll see the data that you just deleted. For information about removing unneeded old snapshots with the related data files for cleanup purposes, see the section Maintaining files by using compaction later in this guide.

Reading data

You can read the latest status of your Iceberg tables in Spark with both Spark SQL and DataFrames. 

Example using Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)

Example using the DataFrames API:

df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)

Using time travel

Each write operation (insert, update, upsert, delete) in an Iceberg table creates a new snapshot. You can then use these snapshots for time travel—to go back in time and check the status of a table in the past.

For information about how to retrieve the history of snapshots for tables by using snapshot-id and timing values, see the Accessing metadata section later in this guide.

The following time travel query displays the status of a table based on a specific snapshot-id.

Using Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)

Using the DataFrames API:

df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

The following time travel query displays the status of a table based on the last snapshot that was created before a specific timestamp, in milliseconds (as-of-timestamp).

Using Spark SQL:

spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)

Using the DataFrames API:

df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

Using incremental queries

You can also use Iceberg snapshots to read appended data incrementally. 

Note:  Currently, this operation supports reading data from append snapshots. It doesn't support fetching data from operations such as replace, overwrite, or delete.  Additionally, incremental read operations aren't supported in the Spark SQL syntax.

The following example retrieves all the records appended to an Iceberg table between the snapshot start-snapshot-id (exclusive) and end-snapshot-id (inclusive).

df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )

Accessing metadata

Iceberg provides access to its metadata through SQL. You can access the metadata for any given table (<table_name>)  by querying the namespace <table_name>.<metadata_table>. For a complete list of metadata tables, see Inspecting tables in the Iceberg documentation.

The following example shows how to access the Iceberg history metadata table, which shows the history of commits (changes) for an Iceberg table. 

Using Spark SQL (with the %%sql magic) from an Amazon EMR Studio notebook:

Spark.sql(f“”” SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)

Using the DataFrames API:

spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)

Sample output:

Sample metadata output from an Iceberg table