Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Dieser Abschnitt bietet einen Überblick über die Verwendung von Apache Spark für die Interaktion mit Iceberg-Tabellen. Bei den Beispielen handelt es sich um Standardcode, der auf Amazon EMR oder ausgeführt werden kann. AWS Glue
Hinweis: Die primäre Schnittstelle für die Interaktion mit Iceberg-Tabellen ist SQL, daher kombinieren die meisten Beispiele Spark SQL mit der API. DataFrames
Iceberg-Tabellen erstellen und schreiben
Sie können Spark SQL und Spark verwenden, um Daten DataFrames zu Iceberg-Tabellen zu erstellen und hinzuzufügen.
Verwenden von Spark SQL
Verwenden Sie standardmäßige Spark-SQL-Anweisungen wie CREATE TABLE
undINSERT INTO
, um einen Iceberg-Datensatz zu schreiben.
Unpartitionierte Tabellen
Hier ist ein Beispiel für die Erstellung einer unpartitionierten Iceberg-Tabelle mit Spark SQL:
spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)
Verwenden Sie eine Standardanweisung, um Daten in eine unpartitionierte Tabelle einzufügen: INSERT
INTO
spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)
Partitionierte Tabellen
Hier ist ein Beispiel für die Erstellung einer partitionierten Iceberg-Tabelle mit Spark SQL:
spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)
Um Daten mit Spark SQL in eine partitionierte Iceberg-Tabelle einzufügen, führen Sie eine globale Sortierung durch und schreiben dann die Daten:
spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)
Verwenden der API DataFrames
Um einen Iceberg-Datensatz zu schreiben, können Sie die DataFrameWriterV2
API verwenden.
Verwenden Sie die Funktion df.writeTo(
t), um eine Iceberg-Tabelle zu erstellen und Daten in diese zu schreiben. Wenn die Tabelle existiert, verwenden Sie die .append()
Funktion. Ist dies nicht .create().
der Fall.createOrReplace()
, verwenden Sie die folgenden Beispiele use, was einer .create()
Variante entsprichtCREATE OR REPLACE TABLE AS
SELECT
.
Unpartitionierte Tabellen
So erstellen und füllen Sie eine unpartitionierte Iceberg-Tabelle mithilfe der API: DataFrameWriterV2
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()
So fügen Sie mithilfe der API Daten in eine bestehende unpartitionierte Iceberg-Tabelle ein: DataFrameWriterV2
input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()
Partitionierte Tabellen
Um eine partitionierte Iceberg-Tabelle mithilfe der DataFrameWriterV2
API zu erstellen und aufzufüllen, können Sie eine lokale Sortierung verwenden, um Daten aufzunehmen:
input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()
Um mithilfe der DataFrameWriterV2
API Daten in eine partitionierte Iceberg-Tabelle einzufügen, können Sie eine globale Sortierung verwenden, um Daten aufzunehmen:
input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()
Daten in Iceberg-Tabellen aktualisieren
Das folgende Beispiel zeigt, wie Daten in einer Iceberg-Tabelle aktualisiert werden. In diesem Beispiel werden alle Zeilen geändert, die eine gerade Zahl in der c_customer_sk
Spalte haben.
spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)
Dieser Vorgang verwendet die copy-on-write Standardstrategie, sodass alle betroffenen Datendateien neu geschrieben werden.
Daten in Iceberg-Tabellen werden aktualisiert
Daten aktualisieren bezieht sich auf das Einfügen neuer Datensätze und das Aktualisieren vorhandener Datensätze in einer einzigen Transaktion. Um Daten in eine Iceberg-Tabelle hochzuladen, verwenden Sie die Anweisung. SQL
MERGE INTO
Im folgenden Beispiel wird der Inhalt der Tabelle{UPSERT_TABLE_NAME
} innerhalb der Tabelle verschoben: {TABLE_NAME}
spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
-
Wenn ein Kundendatensatz, der sich in
{UPSERT_TABLE_NAME}
befindet, bereits{TABLE_NAME}
mit demselben vorhanden istc_customer_id
, überschreibt der{UPSERT_TABLE_NAME}
c_email_address
Datensatzwert den vorhandenen Wert (Aktualisierungsvorgang). -
Wenn ein Kundendatensatz, der
{UPSERT_TABLE_NAME}
sich in befindet, nicht existiert{TABLE_NAME}
, wird der{UPSERT_TABLE_NAME}
Datensatz hinzugefügt{TABLE_NAME}
(Vorgang einfügen).
Löschen von Daten in Iceberg-Tabellen
Um Daten aus einer Iceberg-Tabelle zu löschen, verwenden Sie den DELETE FROM
Ausdruck und geben Sie einen Filter an, der den zu löschenden Zeilen entspricht.
spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)
Wenn der Filter auf eine gesamte Partition zutrifft, führt Iceberg eine reine Metadaten-Löschung durch und belässt die Datendateien an Ort und Stelle. Andernfalls werden nur die betroffenen Datendateien neu geschrieben.
Die Methode delete verwendet die Datendateien, die von der WHERE
Klausel betroffen sind, und erstellt eine Kopie davon ohne die gelöschten Datensätze. Anschließend wird ein neuer Tabellen-Snapshot erstellt, der auf die neuen Datendateien verweist. Daher sind die gelöschten Datensätze immer noch in den älteren Snapshots der Tabelle vorhanden. Wenn Sie beispielsweise den vorherigen Snapshot der Tabelle abrufen, werden Ihnen die Daten angezeigt, die Sie gerade gelöscht haben. Informationen zum Entfernen nicht benötigter alter Snapshots mit den zugehörigen Datendateien zu Säuberungszwecken finden Sie im Abschnitt Dateien mithilfe der Komprimierung verwalten weiter unten in diesem Handbuch.
Lesen von Daten
Sie können den aktuellen Status Ihrer Iceberg-Tabellen in Spark sowohl mit Spark SQL als auch lesen. DataFrames
Beispiel mit Spark SQL:
spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)
Beispiel mit der DataFrames API:
df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)
Zeitreisen nutzen
Jeder Schreibvorgang (Einfügen, Aktualisieren, Hochsetzen, Löschen) in einer Iceberg-Tabelle erstellt einen neuen Snapshot. Sie können diese Snapshots dann für Zeitreisen verwenden, um in die Vergangenheit zu reisen und den Status einer Tabelle in der Vergangenheit zu überprüfen.
Informationen zum Abrufen des Verlaufs von Snapshots für Tabellen mithilfe von Werten snapshot-id
und Zeitangaben finden Sie im Abschnitt Zugreifen auf Metadaten weiter unten in diesem Handbuch.
Die folgende Zeitreiseabfrage zeigt den Status einer Tabelle auf der Grundlage eines bestimmten snapshot-id
Status an.
Verwenden von Spark SQL:
spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)
Mithilfe der DataFrames API:
df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \
.format("iceberg") \
.load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \
.limit(5)
Die folgende Zeitreiseabfrage zeigt den Status einer Tabelle auf der Grundlage des letzten Snapshots, der vor einem bestimmten Zeitstempel erstellt wurde, in Millisekunden () an. as-of-timestamp
Verwenden von Spark SQL:
spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)
Mithilfe der DataFrames API:
df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \
.format("iceberg") \
.load(f"dev.{DB_NAME}.{TABLE_NAME}") \
.limit(5)
Verwendung inkrementeller Abfragen
Sie können Iceberg-Snapshots auch verwenden, um angehängte Daten inkrementell zu lesen.
Hinweis: Derzeit unterstützt dieser Vorgang das Lesen von Daten aus Snapshots. append
Das Abrufen von Daten aus Operationen wie replace
overwrite
, oder wird nicht unterstützt. delete
Darüber hinaus werden inkrementelle Lesevorgänge in der Spark-SQL-Syntax nicht unterstützt.
Im folgenden Beispiel werden alle Datensätze abgerufen, die an eine Iceberg-Tabelle zwischen dem Snapshot start-snapshot-id
(exklusiv) und end-snapshot-id
(inklusive) angehängt wurden.
df_incremental = (spark.read.format("iceberg")
.option("start-snapshot-id", snapshot_id_start)
.option("end-snapshot-id", snapshot_id_end)
.load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}")
)
Zugreifen auf Metadaten
Iceberg bietet Zugriff auf seine Metadaten über SQL. Sie können auf die Metadaten für jede beliebige Tabelle (<table_name>
) zugreifen, indem Sie den Namespace abfragen. <table_name>.<metadata_table>
Eine vollständige Liste der Metadatentabellen finden Sie in der Iceberg-Dokumentation unter Tabellen
Das folgende Beispiel zeigt, wie Sie auf die Iceberg-Historien-Metadatentabelle zugreifen können, in der der Verlauf der Commits (Änderungen) für eine Iceberg-Tabelle angezeigt wird.
Verwenden von Spark SQL (mit der %%sql
Magie) von einem Amazon EMR Studio-Notizbuch aus:
Spark.sql(f“””
SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5
""")
Mithilfe der DataFrames API:
spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)
Beispielausgabe:
