Lavorare con le tabelle Apache Iceberg utilizzando Apache Spark - AWS Guida prescrittiva

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Lavorare con le tabelle Apache Iceberg utilizzando Apache Spark

Questa sezione fornisce una panoramica sull'uso di Apache Spark per interagire con le tabelle Iceberg. Gli esempi sono codice standard che può essere eseguito su Amazon EMR o. AWS Glue

Nota: l'interfaccia principale per l'interazione con le tabelle Iceberg è SQL, quindi la maggior parte degli esempi combinerà Spark SQL con l'API. DataFrames

Creazione e scrittura di tabelle Iceberg

Puoi usare Spark SQL e Spark DataFrames per creare e aggiungere dati alle tabelle Iceberg.

Usare Spark SQL

Per scrivere un set di dati Iceberg, usa istruzioni SQL Spark standard come e. CREATE TABLE INSERT INTO

Tabelle non partizionate

Ecco un esempio di creazione di una tabella Iceberg non partizionata con Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)

Per inserire dati in una tabella non partizionata, usa un'istruzione standard: INSERT INTO

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)

Tabelle partizionate

Ecco un esempio di creazione di una tabella Iceberg partizionata con Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)

Per inserire dati in una tabella Iceberg partizionata con Spark SQL, esegui un ordinamento globale e poi scrivi i dati:

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)

Utilizzando l'API DataFrames

Per scrivere un set di dati Iceberg, puoi utilizzare l'DataFrameWriterV2API.

Per creare una tabella Iceberg e scrivere dati su di essa, usa la funzione df.writeTo( t). Se la tabella esiste, usa la .append() funzione. In caso contrario, usa .create(). Gli esempi seguenti usano.createOrReplace(), che è una variante di .create() che è equivalente aCREATE OR REPLACE TABLE AS SELECT.

Tabelle non partizionate

Per creare e popolare una tabella Iceberg non partizionata utilizzando l'API: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()

Per inserire dati in una tabella Iceberg non partizionata esistente utilizzando l'API: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()

Tabelle partizionate

Per creare e popolare una tabella Iceberg partizionata utilizzando l'DataFrameWriterV2API, puoi utilizzare un ordinamento locale per importare i dati:

input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()

Per inserire dati in una tabella Iceberg partizionata utilizzando l'DataFrameWriterV2API, puoi utilizzare un ordinamento globale per inserire i dati:

input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()

Aggiornamento dei dati nelle tabelle Iceberg

L'esempio seguente mostra come aggiornare i dati in una tabella Iceberg. Questo esempio modifica tutte le righe che hanno un numero pari nella c_customer_sk colonna.

spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)

Questa operazione utilizza la copy-on-write strategia predefinita, quindi riscrive tutti i file di dati interessati.

Sconvolgimento dei dati nelle tabelle Iceberg

L'alterazione dei dati si riferisce all'inserimento di nuovi record di dati e all'aggiornamento dei record di dati esistenti in un'unica transazione. Per trasformare i dati in una tabella Iceberg, si utilizza l'istruzione. SQL MERGE INTO 

L'esempio seguente inverte il contenuto della tabella} all'interno della tabella{UPSERT_TABLE_NAME: {TABLE_NAME}

spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
  • Se un record cliente presente in esiste {UPSERT_TABLE_NAME} già in {TABLE_NAME} con lo stessoc_customer_id, il valore del {UPSERT_TABLE_NAME} record sostituisce il c_email_address valore esistente (operazione di aggiornamento).

  • Se un record del cliente presente in {UPSERT_TABLE_NAME} non esiste in{TABLE_NAME}, il {UPSERT_TABLE_NAME} record viene aggiunto a {TABLE_NAME} (operazione di inserimento).

Eliminazione dei dati nelle tabelle Iceberg

Per eliminare i dati da una tabella Iceberg, utilizzate l'DELETE FROMespressione e specificate un filtro che corrisponda alle righe da eliminare.

spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)

Se il filtro corrisponde a un'intera partizione, Iceberg elimina solo i metadati e lascia i file di dati al loro posto. Altrimenti, riscrive solo i file di dati interessati.

Il metodo delete prende i file di dati interessati dalla WHERE clausola e ne crea una copia senza i record eliminati. Quindi crea una nuova istantanea della tabella che punta ai nuovi file di dati. Pertanto, i record eliminati sono ancora presenti nelle istantanee precedenti della tabella. Ad esempio, se recuperi l'istantanea precedente della tabella, vedrai i dati che hai appena eliminato. Per informazioni sulla rimozione di vecchie istantanee non necessarie con i relativi file di dati per scopi di pulizia, consulta la sezione Manutenzione dei file utilizzando la compattazione più avanti in questa guida.

Lettura dei dati

Puoi leggere lo stato più recente delle tue tabelle Iceberg in Spark sia con Spark SQL che. DataFrames 

Esempio di utilizzo di Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)

Esempio di utilizzo dell' DataFrames API:

df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)

Utilizzo del viaggio nel tempo

Ogni operazione di scrittura (inserimento, aggiornamento, annullamento, eliminazione) in una tabella Iceberg crea una nuova istantanea. È quindi possibile utilizzare queste istantanee per viaggiare nel tempo, per tornare indietro nel tempo e controllare lo stato di una tabella nel passato.

Per informazioni su come recuperare la cronologia delle istantanee per le tabelle utilizzando snapshot-id e temporizzando i valori, consultate la sezione Accesso ai metadati più avanti in questa guida.

La seguente query sui viaggi nel tempo mostra lo stato di una tabella in base a uno specifico. snapshot-id

Usando Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)

Utilizzo dell' DataFrames API:

df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

La seguente query sul viaggio nel tempo mostra lo stato di una tabella in base all'ultima istantanea creata prima di un timestamp specifico, in millisecondi (). as-of-timestamp

Usando Spark SQL:

spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)

Utilizzo dell' DataFrames API:

df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

Utilizzo di query incrementali

È inoltre possibile utilizzare le istantanee Iceberg per leggere i dati aggiunti in modo incrementale. 

Nota: attualmente, questa operazione supporta la lettura di dati da istantanee. append Non supporta il recupero di dati da operazioni come replaceoverwrite, o. delete  Inoltre, le operazioni di lettura incrementali non sono supportate nella sintassi SQL di Spark.

L'esempio seguente recupera tutti i record aggiunti a una tabella Iceberg compresi tra l'istantanea start-snapshot-id (esclusiva) e (inclusa). end-snapshot-id

df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )

Accesso ai metadati

Iceberg fornisce l'accesso ai propri metadati tramite SQL. È possibile accedere ai metadati per ogni tabella (<table_name>) interrogando il namespace. <table_name>.<metadata_table> Per un elenco completo delle tabelle di metadati, consulta Ispezione delle tabelle nella documentazione di Iceberg.

L'esempio seguente mostra come accedere alla tabella dei metadati della cronologia di Iceberg, che mostra la cronologia dei commit (modifiche) per una tabella Iceberg. 

Utilizzando Spark SQL (con la %%sql magia) da un notebook Amazon EMR Studio:

Spark.sql(f“”” SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)

Utilizzo dell'API: DataFrames

spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)

Output di esempio:

Esempio di metadati in uscita da una tabella Iceberg