Arbeiten mit einem Hudi-Datensatz - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Arbeiten mit einem Hudi-Datensatz

Hudi unterstützt das Einfügen, Aktualisieren und Löschen von Daten in Hudi-Datensätzen über Spark. Weitere Informationen finden Sie unter Hudi-Tabellen schreiben in der Apache-Hudi-Dokumentation.

Die folgenden Beispiele zeigen, wie Sie die interaktive Spark-Shell starten, Spark Submit verwenden oder Amazon EMR Notebooks verwenden, um mit Hudi auf Amazon EMR zu arbeiten. Sie können auch das DeltaStreamer Hudi-Hilfsprogramm oder andere Tools verwenden, um in einen Datensatz zu schreiben. In diesem Abschnitt zeigen die Beispiele, wie Sie mit Datensätzen arbeiten, indem Sie die Spark-Shell verwenden, während Sie SSH als hadoop Standardbenutzer mit dem Master-Knoten verbunden sind.

Wenn Sie Amazon EMR 6.7.0 oder höher ausführen spark-shell oder spark-sql verwenden, übergeben Sie die folgenden Befehle. spark-submit

Anmerkung

Amazon EMR 6.7.0 verwendet Apache Hudi 0.11.0-amzn-0, das erhebliche Verbesserungen gegenüber früheren Hudi-Versionen enthält. Weitere Informationen finden Sie im Apache-Hudi-0.11.0-Migrationshandbuch. Die Beispiele auf dieser Registerkarte spiegeln diese Änderungen wider.

So öffnen Sie die Spark-Shell auf dem Primärknoten
  1. Stellen Sie mithilfe von Connect zum Primärknoten herSSH. Weitere Informationen finden Sie unter Connect mit dem primären Knoten herstellen SSH im Amazon EMR Management Guide.

  2. Geben Sie den folgenden Befehl ein, um die Spark-Shell zu starten. Um die PySpark Shell zu verwenden, ersetzen Sie spark-shell mit pyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Wenn Sie Amazon EMR 6.6.x oder früher ausführen spark-shell oder spark-sql verwenden, übergeben Sie die folgenden Befehle. spark-submit

Anmerkung
  • Amazon EMR 6.2 und 5.31 und höher (Hudi 0.6.x und höher) können das in der Konfiguration weglassen. spark-avro.jar

  • Amazon EMR 6.5 und 5.35 und höher (Hudi 0.9.x und höher) können in der Konfiguration weglassenspark.sql.hive.convertMetastoreParquet=false.

  • Amazon EMR 6.6 und 5.36 und höher (Hudi 0.10.x und höher) müssen die HoodieSparkSessionExtension Konfiguration enthalten, wie sie im Spark-Leitfaden für Version: 0.10.0 beschrieben ist:

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
So öffnen Sie die Spark-Shell auf dem Primärknoten
  1. Stellen Sie mithilfe von Connect zum Primärknoten herSSH. Weitere Informationen finden Sie unter Connect mit dem primären Knoten herstellen SSH im Amazon EMR Management Guide.

  2. Geben Sie den folgenden Befehl ein, um die Spark-Shell zu starten. Um die PySpark Shell zu verwenden, ersetzen Sie spark-shell mit pyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Um Hudi mit Amazon EMR Notebooks zu verwenden, müssen Sie zuerst die Hudi-JAR-Dateien aus dem lokalen Dateisystem HDFS auf den Master-Knoten des Notebook-Clusters kopieren. Anschließend verwenden Sie den Notebook-Editor, um Ihr EMR Notebook für die Verwendung von Hudi zu konfigurieren.

Um Hudi mit Amazon EMR Notebooks zu verwenden
  1. Erstellen und starten Sie einen Cluster für Amazon EMR Notebooks. Weitere Informationen finden Sie unter Erstellen von EMR Amazon-Clustern für Notebooks im Amazon EMR Management Guide.

  2. Stellen Sie mithilfe der JAR-Dateien eine Connect zum Master-Knoten des Clusters her SSH und kopieren Sie sie dann aus dem lokalen Dateisystem in, HDFS wie in den folgenden Beispielen gezeigt. In diesem Beispiel erstellen wir aus HDFS Gründen der Übersichtlichkeit der Dateiverwaltung ein Verzeichnis. Falls gewünschtHDFS, können Sie Ihr eigenes Ziel wählen.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Öffnen Sie den Notebook-Editor, geben Sie den Code aus dem folgenden Beispiel ein und führen Sie ihn aus.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Um Hudi mit Amazon EMR Notebooks zu verwenden, müssen Sie zuerst die Hudi-JAR-Dateien aus dem lokalen Dateisystem HDFS auf den Master-Knoten des Notebook-Clusters kopieren. Anschließend verwenden Sie den Notebook-Editor, um Ihr EMR Notebook für die Verwendung von Hudi zu konfigurieren.

Um Hudi mit Amazon EMR Notebooks zu verwenden
  1. Erstellen und starten Sie einen Cluster für Amazon EMR Notebooks. Weitere Informationen finden Sie unter Erstellen von EMR Amazon-Clustern für Notebooks im Amazon EMR Management Guide.

  2. Stellen Sie mithilfe der JAR-Dateien eine Connect zum Master-Knoten des Clusters her SSH und kopieren Sie sie dann aus dem lokalen Dateisystem in, HDFS wie in den folgenden Beispielen gezeigt. In diesem Beispiel erstellen wir aus HDFS Gründen der Übersichtlichkeit der Dateiverwaltung ein Verzeichnis. Falls gewünschtHDFS, können Sie Ihr eigenes Ziel wählen.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Öffnen Sie den Notebook-Editor, geben Sie den Code aus dem folgenden Beispiel ein und führen Sie ihn aus.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Initialisieren Sie eine Spark-Sitzung für Hudi

Wenn Sie Scala verwenden, müssen Sie die folgenden Klassen in Ihre Spark-Sitzung importieren. Dies muss einmal pro Spark-Sitzung erfolgen.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Schreiben Sie in einen Hudi-Datensatz

Die folgenden Beispiele zeigen, wie Sie einen Hudi-Datensatz erstellen DataFrame und ihn als Hudi-Datensatz schreiben.

Anmerkung

Um Codebeispiele in die Spark-Shell einzufügen, geben Sie an der Eingabeaufforderung :paste ein, fügen das Beispiel ein und drücken dann CTRL + D.

Jedes Mal, wenn Sie einen DataFrame in einen Hudi-Datensatz schreiben, müssen Sie Folgendes angeben. DataSourceWriteOptions Viele dieser Optionen sind unter den Schreiboperationen wahrscheinlich identisch. Im folgenden Beispiel werden allgemeine Optionen unter Verwendung der Variablen hudiOptions angegeben, die von nachfolgenden Beispielen verwendet wird.

Anmerkung

Amazon EMR 6.7.0 verwendet Apache Hudi 0.11.0-amzn-0, das erhebliche Verbesserungen gegenüber früheren Hudi-Versionen enthält. Weitere Informationen finden Sie im Apache-Hudi-0.11.0-Migrationshandbuch. Die Beispiele auf dieser Registerkarte spiegeln diese Änderungen wider.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')
Anmerkung

Möglicherweise sehen Sie in Codebeispielen und Benachrichtigungen „hoodie“ anstelle von Hudi. In der Hudi-Codebasis wird häufig die alte Schreibweise „hoodie“ verwendet.

DataSourceWriteOptions Referenz für Hudi
Option Beschreibung

TABLE_NAME

Der Tabellenname, unter dem das Dataset registriert werden soll.

TABLE_TYPE_OPT_KEY

Optional. Gibt an, ob das Dataset als "COPY_ON_WRITE" oder "MERGE_ON_READ" erstellt wird. Der Standardwert ist "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

Das Datensatzschlüsselfeld, dessen Wert als die recordKey-Komponente von HoodieKey verwendet wird. Der tatsächliche Wert wird durch Aufrufen von .toString() für den Feldwert abgerufen. Verschachtelte Felder können mithilfe der Punktnotation angegeben werden, z. B a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

Das Partitionspfadfeld, dessen Wert als die Komponente partitionPath von HoodieKey verwendet wird. Der tatsächliche Wert wird durch Aufrufen von .toString() für den Feldwert abgerufen.

PRECOMBINE_FIELD_OPT_KEY

Das Feld, das in der Vorab-Kombination vor dem tatsächlichen Schreiben verwendet wird. Wenn zwei Datensätze denselben Schlüsselwert haben, wählt Hudi den Datensatz mit dem größten Wert für das Vorab-Kombinationsfeld wie von Object.compareTo(..) festgelegt aus.

Die folgenden Optionen sind nur erforderlich, um die Hudi-Datensatz-Tabelle in Ihrem Metastore zu registrieren. Wenn Sie Ihr Hudi-Datensatz nicht als Tabelle im Hive-Metastore registrieren, sind diese Optionen nicht erforderlich.

DataSourceWriteOptions Referenz für Hive
Option Beschreibung

HIVE_DATABASE_OPT_KEY

Die Hive-Datenbank, mit der synchronisiert werden soll. Der Standardwert ist "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

Die Klasse, mit der Partitionsfeldwerte in Hive-Partitionsspalten extrahiert werden.

HIVE_PARTITION_FIELDS_OPT_KEY

Das Feld im Dataset, anhand dessen Hive-Partitionsspalten bestimmt werden sollen.

HIVE_SYNC_ENABLED_OPT_KEY

Wenn diese Option auf "true" eingestellt ist, wird das Dataset beim Apache-Hive-Metastore registriert. Der Standardwert ist "false".

HIVE_TABLE_OPT_KEY

Erforderlich Der Name der Tabelle in Hive, mit der synchronisiert werden soll. z. B. "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Optional. Der Hive-Benutzername, der bei der Synchronisierung verwendet werden soll. z. B. "hadoop".

HIVE_PASS_OPT_KEY

Optional. Das von HIVE_USER_OPT_KEY angegebene Hive-Passwort für den Benutzer.

HIVE_URL_OPT_KEY

Der Hive-Metastore. URL

Upsert Daten

Das folgende Beispiel zeigt, wie Daten durch Schreiben von A verändert werden. DataFrame Im Gegensatz zum vorherigen Einfügebeispiel wird der Wert OPERATION_OPT_KEY auf UPSERT_OPERATION_OPT_VAL eingestellt. Darüber hinaus wird mit .mode(SaveMode.Append) angegeben, dass der Datensatz angehängt werden soll.

Anmerkung

Amazon EMR 6.7.0 verwendet Apache Hudi 0.11.0-amzn-0, das erhebliche Verbesserungen gegenüber früheren Hudi-Versionen enthält. Weitere Informationen finden Sie im Apache-Hudi-0.11.0-Migrationshandbuch. Die Beispiele auf dieser Registerkarte spiegeln diese Änderungen wider.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

Einen Datensatz löschen

Um einen Datensatz dauerhaft zu löschen, können Sie eine leere Datenlast einfügen. In diesem Fall gibt die Option PAYLOAD_CLASS_OPT_KEY die Klasse EmptyHoodieRecordPayload an. Im Beispiel wird derselbe DataFrame,, verwendetupdateDF, der im Upsert-Beispiel verwendet wurde, um denselben Datensatz anzugeben.

Anmerkung

Amazon EMR 6.7.0 verwendet Apache Hudi 0.11.0-amzn-0, das erhebliche Verbesserungen gegenüber früheren Hudi-Versionen enthält. Weitere Informationen finden Sie im Apache-Hudi-0.11.0-Migrationshandbuch. Die Beispiele auf dieser Registerkarte spiegeln diese Änderungen wider.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

Sie können Daten auch dauerhaft löschen, indem Sie OPERATION_OPT_KEY auf DELETE_OPERATION_OPT_VAL einstellen, um alle Datensätze in dem von Ihnen eingereichten Datensatz zu entfernen. Anweisungen zur Durchführung von „weichen Löschungen“ und weitere Informationen zum Löschen von Daten, die in Hudi-Tabellen gespeichert sind, finden Sie unter Löschen in der Apache-Hudi-Dokumentation.

Aus einem Hudi-Datensatz lesen

Um Daten zum aktuellen Zeitpunkt abzurufen, führt Hudi standardmäßig Snapshot-Abfragen durch. Im Folgenden finden Sie ein Beispiel für die Abfrage des in S3 geschriebenen Datensatzes in Schreiben Sie in einen Hudi-Datensatz. Ersetzen s3://DOC-EXAMPLE-BUCKET/myhudidataset mit Ihrem Tabellenpfad und fügen Sie Platzhalter-Sternchen für jede Partitionsebene sowie ein zusätzliches Sternchen hinzu. In diesem Beispiel gibt es eine Partitionsebene, daher haben wir zwei Platzhaltersymbole hinzugefügt.

Anmerkung

Amazon EMR 6.7.0 verwendet Apache Hudi 0.11.0-amzn-0, das erhebliche Verbesserungen gegenüber früheren Hudi-Versionen enthält. Weitere Informationen finden Sie im Apache-Hudi-0.11.0-Migrationshandbuch. Die Beispiele auf dieser Registerkarte spiegeln diese Änderungen wider.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset' + '/*/*') snapshotQueryDF.show()

Inkrementelle Abfragen

Sie können mit Hudi auch inkrementelle Abfragen durchführen, um einen Stream von Datensätzen abzurufen, die sich seit einem bestimmten Commit-Zeitstempel geändert haben. Setzen Sie dazu das Feld QUERY_TYPE_OPT_KEY auf QUERY_TYPE_INCREMENTAL_OPT_VAL. Fügen Sie dann einen Wert für BEGIN_INSTANTTIME_OPT_KEY hinzu, um alle Datensätze abzurufen, die seit dem angegebenen Zeitpunkt geschrieben wurden. Inkrementelle Abfragen sind in der Regel zehnmal effizienter als ihre Gegenstücke im Batch-Modus, da sie nur geänderte Datensätze verarbeiten.

Wenn Sie inkrementelle Abfragen ausführen, verwenden Sie den Pfad der Stammtabelle (Basistabelle) ohne die für Snapshot-Abfragen verwendeten Platzhaltersterchen.

Anmerkung

Presto unterstützt keine inkrementellen Abfragen.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset') incQueryDF.show()

Weitere Informationen zum Lesen von Hudi-Datensätzen finden Sie unter Abfragen von Hudi-Tabellen in der Apache-Hudi-Dokumentation.