Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Trabajar con tablas de Apache Iceberg mediante Apache Spark
En esta sección se proporciona una descripción general del uso de Apache Spark para interactuar con las tablas Iceberg. Los ejemplos son código repetitivo que se puede ejecutar en Amazon EMR o. AWS Glue
Nota: La interfaz principal para interactuar con las tablas de Iceberg es SQL, por lo que la mayoría de los ejemplos combinarán Spark SQL con la API. DataFrames
Crear y escribir tablas Iceberg
Puedes usar Spark SQL y Spark DataFrames para crear y añadir datos a las tablas de Iceberg.
Uso de Spark SQL
Para escribir un conjunto de datos de Iceberg, usa sentencias SQL estándar de Spark, como CREATE TABLE
yINSERT INTO
.
Tablas sin particionar
Este es un ejemplo de cómo crear una tabla Iceberg sin particiones con Spark SQL:
spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)
Para insertar datos en una tabla sin particiones, usa una declaración estándar: INSERT
INTO
spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)
Tablas particionadas
Este es un ejemplo de cómo crear una tabla Iceberg particionada con Spark SQL:
spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)
Para insertar datos en una tabla Iceberg particionada con Spark SQL, realiza una ordenación global y, a continuación, escribe los datos:
spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)
Uso de la API DataFrames
Para escribir un conjunto de datos de Iceberg, puedes usar la DataFrameWriterV2
API.
Para crear una tabla de iceberg y escribir datos en ella, usa la función df.writeTo(
t). Si la tabla existe, utilice la .append()
función. Si no es así, usa .create().
Los siguientes ejemplos usan.createOrReplace()
, que es una variación de lo .create()
que equivale aCREATE OR REPLACE TABLE AS
SELECT
.
Tablas sin particionar
Para crear y rellenar una tabla de Iceberg sin particiones mediante la API: DataFrameWriterV2
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()
Para insertar datos en una tabla de Iceberg sin particiones existente mediante la API: DataFrameWriterV2
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()
Tablas particionadas
Para crear y rellenar una tabla Iceberg particionada mediante la DataFrameWriterV2
API, puedes usar una ordenación local para ingerir datos:
input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()
Para insertar datos en una tabla Iceberg particionada mediante la DataFrameWriterV2
API, puedes usar una ordenación global para ingerir los datos:
input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()
Actualización de datos en tablas de iceberg
En el siguiente ejemplo, se muestra cómo actualizar los datos de una tabla de iceberg. En este ejemplo se modifican todas las filas que tienen un número par en la c_customer_sk
columna.
spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)
Esta operación utiliza la copy-on-write estrategia predeterminada, por lo que reescribe todos los archivos de datos afectados.
Alterar los datos de las tablas de Iceberg
La alteración de los datos consiste en insertar nuevos registros de datos y actualizar los registros de datos existentes en una sola transacción. Para descomponer los datos en una tabla de iceberg, se utiliza la declaración. SQL
MERGE INTO
El siguiente ejemplo altera el contenido de la tabla} dentro de la tabla{UPSERT_TABLE_NAME
: {TABLE_NAME}
spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
-
Si un registro de cliente que está
{UPSERT_TABLE_NAME}
ya existe{TABLE_NAME}
con el mismoc_customer_id
, el valor del{UPSERT_TABLE_NAME}
registro anula elc_email_address
valor existente (operación de actualización). -
Si un registro de cliente incluido
{UPSERT_TABLE_NAME}
no existe en{TABLE_NAME}
, el{UPSERT_TABLE_NAME}
registro se agrega a{TABLE_NAME}
(operación de inserción).
Eliminar datos de las tablas de Iceberg
Para eliminar datos de una tabla de iceberg, utilice la DELETE FROM
expresión y especifique un filtro que coincida con las filas que desee eliminar.
spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)
Si el filtro coincide con una partición completa, Iceberg elimina solo los metadatos y deja los archivos de datos en su lugar. De lo contrario, solo reescribe los archivos de datos afectados.
El método de eliminación toma los archivos de datos afectados por la WHERE
cláusula y crea una copia de los mismos sin los registros eliminados. A continuación, crea una nueva instantánea de la tabla que apunta a los nuevos archivos de datos. Por lo tanto, los registros eliminados siguen presentes en las instantáneas anteriores de la tabla. Por ejemplo, si recupera la instantánea anterior de la tabla, verá los datos que acaba de eliminar. Para obtener información sobre cómo eliminar instantáneas antiguas innecesarias con los archivos de datos relacionados con fines de limpieza, consulte la sección Mantenimiento de archivos mediante la compactación, que aparece más adelante en esta guía.
Lectura de datos
Puedes leer el estado más reciente de tus tablas de Iceberg en Spark tanto con Spark SQL como con. DataFrames
Ejemplo de uso de Spark SQL:
spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)
Ejemplo de uso de la DataFrames API:
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
Uso del viaje en el tiempo
Cada operación de escritura (insertar, actualizar, modificar, eliminar) en una tabla de Iceberg crea una nueva instantánea. A continuación, puede utilizar estas instantáneas para viajar en el tiempo, es decir, para retroceder en el tiempo y comprobar el estado de una tabla en el pasado.
Para obtener información sobre cómo recuperar el historial de las instantáneas de las tablas mediante valores de uso snapshot-id
y temporización, consulte la sección Acceso a los metadatos que aparece más adelante en esta guía.
La siguiente consulta de viaje en el tiempo muestra el estado de una tabla en función de un dato específicosnapshot-id
.
Uso de Spark SQL:
spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)
Uso de la DataFrames API:
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)
La siguiente consulta de viaje en el tiempo muestra el estado de una tabla en función de la última instantánea que se creó antes de una marca de tiempo específica, en milisegundos ()as-of-timestamp
.
Uso de Spark SQL:
spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)
Uso de la DataFrames API:
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)
Uso de consultas incrementales
También puede utilizar las instantáneas de Iceberg para leer los datos adjuntos de forma incremental.
Nota: Actualmente, esta operación admite la lectura de datos de instantáneas. append
No admite la obtención de datos de operaciones como replace
overwrite
, o. delete
Además, las operaciones de lectura incremental no se admiten en la sintaxis SQL de Spark.
En el siguiente ejemplo, se recuperan todos los registros adjuntos a una tabla de Iceberg entre la instantánea start-snapshot-id
(exclusiva) y la end-snapshot-id
(incluida).
df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )
Acceder a los metadatos
Iceberg proporciona acceso a sus metadatos a través de SQL. Puede acceder a los metadatos de cualquier tabla (<table_name>
) consultando el espacio de nombres. <table_name>.<metadata_table>
Para obtener una lista completa de las tablas de metadatos, consulte Inspección de tablas
El siguiente ejemplo muestra cómo acceder a la tabla de metadatos del historial de Iceberg, que muestra el historial de confirmaciones (cambios) de una tabla de Iceberg.
Uso de Spark SQL (con la %%sql
magia) desde una libreta Amazon EMR Studio:
Spark.sql(f“”” SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)
Uso de la DataFrames API:
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
Resultado de ejemplo: