Wählen Sie Ihre Cookie-Einstellungen aus

Wir verwenden essentielle Cookies und ähnliche Tools, die für die Bereitstellung unserer Website und Services erforderlich sind. Wir verwenden Performance-Cookies, um anonyme Statistiken zu sammeln, damit wir verstehen können, wie Kunden unsere Website nutzen, und Verbesserungen vornehmen können. Essentielle Cookies können nicht deaktiviert werden, aber Sie können auf „Anpassen“ oder „Ablehnen“ klicken, um Performance-Cookies abzulehnen.

Wenn Sie damit einverstanden sind, verwenden AWS und zugelassene Drittanbieter auch Cookies, um nützliche Features der Website bereitzustellen, Ihre Präferenzen zu speichern und relevante Inhalte, einschließlich relevanter Werbung, anzuzeigen. Um alle nicht notwendigen Cookies zu akzeptieren oder abzulehnen, klicken Sie auf „Akzeptieren“ oder „Ablehnen“. Um detailliertere Entscheidungen zu treffen, klicken Sie auf „Anpassen“.

Arbeiten mit Apache Iceberg-Tabellen mithilfe von Apache Spark

Fokusmodus
Arbeiten mit Apache Iceberg-Tabellen mithilfe von Apache Spark - AWS Präskriptive Leitlinien

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Dieser Abschnitt bietet einen Überblick über die Verwendung von Apache Spark für die Interaktion mit Iceberg-Tabellen. Bei den Beispielen handelt es sich um Standardcode, der auf Amazon EMR oder ausgeführt werden kann. AWS Glue

Hinweis: Die primäre Schnittstelle für die Interaktion mit Iceberg-Tabellen ist SQL, daher kombinieren die meisten Beispiele Spark SQL mit der API. DataFrames

Iceberg-Tabellen erstellen und schreiben

Sie können Spark SQL und Spark verwenden, um Daten DataFrames zu Iceberg-Tabellen zu erstellen und hinzuzufügen.

Verwenden von Spark SQL

Verwenden Sie standardmäßige Spark-SQL-Anweisungen wie CREATE TABLE undINSERT INTO, um einen Iceberg-Datensatz zu schreiben.

Unpartitionierte Tabellen

Hier ist ein Beispiel für die Erstellung einer unpartitionierten Iceberg-Tabelle mit Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg OPTIONS ('format-version'='2') """)

Verwenden Sie eine Standardanweisung, um Daten in eine unpartitionierte Tabelle einzufügen: INSERT INTO

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table """)

Partitionierte Tabellen

Hier ist ein Beispiel für die Erstellung einer partitionierten Iceberg-Tabelle mit Spark SQL:

spark.sql(f""" CREATE TABLE IF NOT EXISTS {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions ( c_customer_sk int, c_customer_id string, c_first_name string, c_last_name string, c_birth_country string, c_email_address string) USING iceberg PARTITIONED BY (c_birth_country) OPTIONS ('format-version'='2') """)

Um Daten mit Spark SQL in eine partitionierte Iceberg-Tabelle einzufügen, führen Sie eine globale Sortierung durch und schreiben dann die Daten:

spark.sql(f""" INSERT INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions SELECT c_customer_sk, c_customer_id, c_first_name, c_last_name, c_birth_country, c_email_address FROM another_table ORDER BY c_birth_country """)

Verwenden der API DataFrames

Um einen Iceberg-Datensatz zu schreiben, können Sie die DataFrameWriterV2 API verwenden.

Verwenden Sie die Funktion df.writeTo( t), um eine Iceberg-Tabelle zu erstellen und Daten in diese zu schreiben. Wenn die Tabelle existiert, verwenden Sie die .append() Funktion. Ist dies nicht .create(). der Fall.createOrReplace(), verwenden Sie die folgenden Beispiele use, was einer .create() Variante entsprichtCREATE OR REPLACE TABLE AS SELECT.

Unpartitionierte Tabellen

So erstellen und füllen Sie eine unpartitionierte Iceberg-Tabelle mithilfe der API: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .tableProperty("format-version", "2") \ .createOrReplace()

So fügen Sie mithilfe der API Daten in eine bestehende unpartitionierte Iceberg-Tabelle ein: DataFrameWriterV2

input_data.writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_nopartitions") \ .append()

Partitionierte Tabellen

Um eine partitionierte Iceberg-Tabelle mithilfe der DataFrameWriterV2 API zu erstellen und aufzufüllen, können Sie eine lokale Sortierung verwenden, um Daten aufzunehmen:

input_data.sortWithinPartitions("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .tableProperty("format-version", "2") \ .partitionedBy("c_birth_country") \ .createOrReplace()

Um mithilfe der DataFrameWriterV2 API Daten in eine partitionierte Iceberg-Tabelle einzufügen, können Sie eine globale Sortierung verwenden, um Daten aufzunehmen:

input_data.orderBy("c_birth_country") \ .writeTo(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}_withpartitions") \ .append()

Daten in Iceberg-Tabellen aktualisieren

Das folgende Beispiel zeigt, wie Daten in einer Iceberg-Tabelle aktualisiert werden. In diesem Beispiel werden alle Zeilen geändert, die eine gerade Zahl in der c_customer_sk Spalte haben.

spark.sql(f""" UPDATE {CATALOG_NAME}.{db.name}.{table.name} SET c_email_address = 'even_row' WHERE c_customer_sk % 2 == 0 """)

Dieser Vorgang verwendet die copy-on-write Standardstrategie, sodass alle betroffenen Datendateien neu geschrieben werden.

Daten in Iceberg-Tabellen werden aktualisiert

Daten aktualisieren bezieht sich auf das Einfügen neuer Datensätze und das Aktualisieren vorhandener Datensätze in einer einzigen Transaktion. Um Daten in eine Iceberg-Tabelle hochzuladen, verwenden Sie die Anweisung. SQL MERGE INTO 

Im folgenden Beispiel wird der Inhalt der Tabelle{UPSERT_TABLE_NAME} innerhalb der Tabelle verschoben: {TABLE_NAME}

spark.sql(f""" MERGE INTO {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} t USING {UPSERT_TABLE_NAME} s ON t.c_customer_id = s.c_customer_id WHEN MATCHED THEN UPDATE SET t.c_email_address = s.c_email_address WHEN NOT MATCHED THEN INSERT * """)
  • Wenn ein Kundendatensatz, der sich in {UPSERT_TABLE_NAME} befindet, bereits {TABLE_NAME} mit demselben vorhanden istc_customer_id, überschreibt der {UPSERT_TABLE_NAME} c_email_address Datensatzwert den vorhandenen Wert (Aktualisierungsvorgang).

  • Wenn ein Kundendatensatz, der {UPSERT_TABLE_NAME} sich in befindet, nicht existiert{TABLE_NAME}, wird der {UPSERT_TABLE_NAME} Datensatz hinzugefügt {TABLE_NAME} (Vorgang einfügen).

Löschen von Daten in Iceberg-Tabellen

Um Daten aus einer Iceberg-Tabelle zu löschen, verwenden Sie den DELETE FROM Ausdruck und geben Sie einen Filter an, der den zu löschenden Zeilen entspricht.

spark.sql(f""" DELETE FROM {CATALOG_NAME}.{db.name}.{table.name} WHERE c_customer_sk % 2 != 0 """)

Wenn der Filter auf eine gesamte Partition zutrifft, führt Iceberg eine reine Metadaten-Löschung durch und belässt die Datendateien an Ort und Stelle. Andernfalls werden nur die betroffenen Datendateien neu geschrieben.

Die Methode delete verwendet die Datendateien, die von der WHERE Klausel betroffen sind, und erstellt eine Kopie davon ohne die gelöschten Datensätze. Anschließend wird ein neuer Tabellen-Snapshot erstellt, der auf die neuen Datendateien verweist. Daher sind die gelöschten Datensätze immer noch in den älteren Snapshots der Tabelle vorhanden. Wenn Sie beispielsweise den vorherigen Snapshot der Tabelle abrufen, werden Ihnen die Daten angezeigt, die Sie gerade gelöscht haben. Informationen zum Entfernen nicht benötigter alter Snapshots mit den zugehörigen Datendateien zu Säuberungszwecken finden Sie im Abschnitt Dateien mithilfe der Komprimierung verwalten weiter unten in diesem Handbuch.

Lesen von Daten

Sie können den aktuellen Status Ihrer Iceberg-Tabellen in Spark sowohl mit Spark SQL als auch lesen. DataFrames 

Beispiel mit Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{db.name}.{table.name} LIMIT 5 """)

Beispiel mit der DataFrames API:

df = spark.table(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}").limit(5)

Zeitreisen nutzen

Jeder Schreibvorgang (Einfügen, Aktualisieren, Hochsetzen, Löschen) in einer Iceberg-Tabelle erstellt einen neuen Snapshot. Sie können diese Snapshots dann für Zeitreisen verwenden, um in die Vergangenheit zu reisen und den Status einer Tabelle in der Vergangenheit zu überprüfen.

Informationen zum Abrufen des Verlaufs von Snapshots für Tabellen mithilfe von Werten snapshot-id und Zeitangaben finden Sie im Abschnitt Zugreifen auf Metadaten weiter unten in diesem Handbuch.

Die folgende Zeitreiseabfrage zeigt den Status einer Tabelle auf der Grundlage eines bestimmten snapshot-id Status an.

Verwenden von Spark SQL:

spark.sql(f""" SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME} VERSION AS OF {snapshot_id} """)

Mithilfe der DataFrames API:

df_1st_snapshot_id = spark.read.option("snapshot-id", snapshot_id) \ .format("iceberg") \ .load(f"{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

Die folgende Zeitreiseabfrage zeigt den Status einer Tabelle auf der Grundlage des letzten Snapshots, der vor einem bestimmten Zeitstempel erstellt wurde, in Millisekunden () an. as-of-timestamp

Verwenden von Spark SQL:

spark.sql(f""" SELECT * FROM dev.{db.name}.{table.name} TIMESTAMP AS OF '{snapshot_ts}' """)

Mithilfe der DataFrames API:

df_1st_snapshot_ts = spark.read.option("as-of-timestamp", snapshot_ts) \ .format("iceberg") \ .load(f"dev.{DB_NAME}.{TABLE_NAME}") \ .limit(5)

Verwendung inkrementeller Abfragen

Sie können Iceberg-Snapshots auch verwenden, um angehängte Daten inkrementell zu lesen. 

Hinweis: Derzeit unterstützt dieser Vorgang das Lesen von Daten aus Snapshots. append Das Abrufen von Daten aus Operationen wie replaceoverwrite, oder wird nicht unterstützt. delete  Darüber hinaus werden inkrementelle Lesevorgänge in der Spark-SQL-Syntax nicht unterstützt.

Im folgenden Beispiel werden alle Datensätze abgerufen, die an eine Iceberg-Tabelle zwischen dem Snapshot start-snapshot-id (exklusiv) und end-snapshot-id (inklusive) angehängt wurden.

df_incremental = (spark.read.format("iceberg") .option("start-snapshot-id", snapshot_id_start) .option("end-snapshot-id", snapshot_id_end) .load(f"glue_catalog.{DB_NAME}.{TABLE_NAME}") )

Zugreifen auf Metadaten

Iceberg bietet Zugriff auf seine Metadaten über SQL. Sie können auf die Metadaten für jede beliebige Tabelle (<table_name>) zugreifen, indem Sie den Namespace abfragen. <table_name>.<metadata_table> Eine vollständige Liste der Metadatentabellen finden Sie in der Iceberg-Dokumentation unter Tabellen überprüfen.

Das folgende Beispiel zeigt, wie Sie auf die Iceberg-Historien-Metadatentabelle zugreifen können, in der der Verlauf der Commits (Änderungen) für eine Iceberg-Tabelle angezeigt wird. 

Verwenden von Spark SQL (mit der %%sql Magie) von einem Amazon EMR Studio-Notizbuch aus:

Spark.sql(f“”” SELECT * FROM {CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history LIMIT 5 """)

Mithilfe der DataFrames API:

spark.read.format("iceberg").load("{CATALOG_NAME}.{DB_NAME}.{TABLE_NAME}.history").show(5,False)

Beispielausgabe:

Beispiel für die Ausgabe von Metadaten aus einer Iceberg-Tabelle
DatenschutzNutzungsbedingungen für die WebsiteCookie-Einstellungen
© 2025, Amazon Web Services, Inc. oder Tochtergesellschaften. Alle Rechte vorbehalten.