Trabajar con tablas de Apache Iceberg mediante Apache Spark - AWS Recomendaciones de

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Trabajar con tablas de Apache Iceberg mediante Apache Spark

En esta sección se proporciona una descripción general del uso de Apache Spark para interactuar con las tablas Iceberg. Los ejemplos son código repetitivo que se puede ejecutar en Amazon EMR o. AWS Glue

Nota: La interfaz principal para interactuar con las tablas de Iceberg es SQL, por lo que la mayoría de los ejemplos combinarán Spark SQL con la API. DataFrames

Crear y escribir tablas Iceberg

Puedes usar Spark SQL y Spark DataFrames para crear y añadir datos a las tablas de Iceberg.

Uso de Spark SQL

Para escribir un conjunto de datos de Iceberg, usa sentencias SQL estándar de Spark, como CREATE TABLE yINSERT INTO.

Tablas sin particionar

Este es un ejemplo de cómo crear una tabla Iceberg sin particiones con Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)

Para insertar datos en una tabla sin particiones, usa una declaración estándar: INSERT INTO

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)

Tablas particionadas

Este es un ejemplo de cómo crear una tabla Iceberg particionada con Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)

Para insertar datos en una tabla Iceberg particionada con Spark SQL, realiza una ordenación global y, a continuación, escribe los datos:

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)

Uso de la API DataFrames

Para escribir un conjunto de datos de Iceberg, puedes usar la DataFrameWriterV2 API.

Para crear una tabla de iceberg y escribir datos en ella, usa la función df.writeTo( t). Si la tabla existe, utilice la .append() función. Si no es así, usa .create(). Los siguientes ejemplos usan.createOrReplace(), que es una variación de lo .create() que equivale aCREATE OR REPLACE TABLE AS SELECT.

Tablas sin particionar

Para crear y rellenar una tabla de Iceberg sin particiones mediante la API: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()

Para insertar datos en una tabla de Iceberg sin particiones existente mediante la API: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()

Tablas particionadas

Para crear y rellenar una tabla Iceberg particionada mediante la DataFrameWriterV2 API, puedes usar una ordenación local para ingerir datos:

input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()

Para insertar datos en una tabla Iceberg particionada mediante la DataFrameWriterV2 API, puedes usar una ordenación global para ingerir los datos:

input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()

Actualización de datos en tablas de iceberg

En el siguiente ejemplo, se muestra cómo actualizar los datos de una tabla de iceberg. En este ejemplo se modifican todas las filas que tienen un número par en la c_customer_sk columna.

spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)

Esta operación utiliza la copy-on-write estrategia predeterminada, por lo que reescribe todos los archivos de datos afectados.

Alterar los datos de las tablas de Iceberg

La alteración de los datos consiste en insertar nuevos registros de datos y actualizar los registros de datos existentes en una sola transacción. Para descomponer los datos en una tabla de iceberg, se utiliza la declaración. SQL MERGE INTO 

El siguiente ejemplo altera el contenido de la tabla} dentro de la tabla{UPSERT_TABLE_NAME: {TABLE_NAME}

spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
  • Si un registro de cliente que está {UPSERT_TABLE_NAME} ya existe {TABLE_NAME} con el mismoc_customer_id, el valor del {UPSERT_TABLE_NAME} registro anula el c_email_address valor existente (operación de actualización).

  • Si un registro de cliente incluido {UPSERT_TABLE_NAME} no existe en{TABLE_NAME}, el {UPSERT_TABLE_NAME} registro se agrega a {TABLE_NAME} (operación de inserción).

Eliminar datos de las tablas de Iceberg

Para eliminar datos de una tabla de iceberg, utilice la DELETE FROM expresión y especifique un filtro que coincida con las filas que desee eliminar.

spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)

Si el filtro coincide con una partición completa, Iceberg elimina solo los metadatos y deja los archivos de datos en su lugar. De lo contrario, solo reescribe los archivos de datos afectados.

El método de eliminación toma los archivos de datos afectados por la WHERE cláusula y crea una copia de los mismos sin los registros eliminados. A continuación, crea una nueva instantánea de la tabla que apunta a los nuevos archivos de datos. Por lo tanto, los registros eliminados siguen presentes en las instantáneas anteriores de la tabla. Por ejemplo, si recupera la instantánea anterior de la tabla, verá los datos que acaba de eliminar. Para obtener información sobre cómo eliminar instantáneas antiguas innecesarias con los archivos de datos relacionados con fines de limpieza, consulte la sección Mantenimiento de archivos mediante la compactación, que aparece más adelante en esta guía.

Lectura de datos

Puedes leer el estado más reciente de tus tablas de Iceberg en Spark tanto con Spark SQL como con. DataFrames 

Ejemplo de uso de Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)

Ejemplo de uso de la DataFrames API:

df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)

Uso del viaje en el tiempo

Cada operación de escritura (insertar, actualizar, modificar, eliminar) en una tabla de Iceberg crea una nueva instantánea. A continuación, puede utilizar estas instantáneas para viajar en el tiempo, es decir, para retroceder en el tiempo y comprobar el estado de una tabla en el pasado.

Para obtener información sobre cómo recuperar el historial de las instantáneas de las tablas mediante valores de uso snapshot-id y temporización, consulte la sección Acceso a los metadatos que aparece más adelante en esta guía.

La siguiente consulta de viaje en el tiempo muestra el estado de una tabla en función de un dato específicosnapshot-id.

Uso de Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)

Uso de la DataFrames API:

df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

La siguiente consulta de viaje en el tiempo muestra el estado de una tabla en función de la última instantánea que se creó antes de una marca de tiempo específica, en milisegundos ()as-of-timestamp.

Uso de Spark SQL:

spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)

Uso de la DataFrames API:

df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

Uso de consultas incrementales

También puede utilizar las instantáneas de Iceberg para leer los datos adjuntos de forma incremental. 

Nota: Actualmente, esta operación admite la lectura de datos de instantáneas. append No admite la obtención de datos de operaciones como replaceoverwrite, o. delete  Además, las operaciones de lectura incremental no se admiten en la sintaxis SQL de Spark.

En el siguiente ejemplo, se recuperan todos los registros adjuntos a una tabla de Iceberg entre la instantánea start-snapshot-id (exclusiva) y la end-snapshot-id (incluida).

df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )

Acceder a los metadatos

Iceberg proporciona acceso a sus metadatos a través de SQL. Puede acceder a los metadatos de cualquier tabla (<table_name>) consultando el espacio de nombres. <table_name>.<metadata_table> Para obtener una lista completa de las tablas de metadatos, consulte Inspección de tablas en la documentación de Iceberg.

El siguiente ejemplo muestra cómo acceder a la tabla de metadatos del historial de Iceberg, que muestra el historial de confirmaciones (cambios) de una tabla de Iceberg. 

Uso de Spark SQL (con la %%sql magia) desde una libreta Amazon EMR Studio:

Spark.sql(f“”” SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)

Uso de la DataFrames API:

spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)

Resultado de ejemplo:

Ejemplo de salida de metadatos de una tabla Iceberg