Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Arbeiten mit einem Hudi-Datensatz
Hudi unterstützt das Einfügen, Aktualisieren und Löschen von Daten in Hudi-Datensätzen über Spark. Weitere Informationen finden Sie unter Hudi-Tabellen schreiben
Die folgenden Beispiele zeigen, wie Sie die interaktive Spark-Shell starten, Spark Submit verwenden oder Amazon EMR Notebooks verwenden, um mit Hudi auf Amazon EMR zu arbeiten. Sie können auch das DeltaStreamer Hudi-Hilfsprogramm oder andere Tools verwenden, um in einen Datensatz zu schreiben. In diesem Abschnitt zeigen die Beispiele, wie Sie mit Datensätzen arbeiten, indem Sie die Spark-Shell verwenden, während Sie SSH als hadoop
Standardbenutzer mit dem Master-Knoten verbunden sind.
Wenn Sie Amazon EMR 6.7.0 oder höher ausführen spark-shell
oder spark-sql
verwenden, übergeben Sie die folgenden Befehle. spark-submit
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
So öffnen Sie die Spark-Shell auf dem Primärknoten
-
Stellen Sie mithilfe von Connect zum Primärknoten herSSH. Weitere Informationen finden Sie unter Connect mit dem primären Knoten herstellen SSH im Amazon EMR Management Guide.
-
Geben Sie den folgenden Befehl ein, um die Spark-Shell zu starten. Um die PySpark Shell zu verwenden, ersetzen Sie
spark-shell
mitpyspark
.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Wenn Sie Amazon EMR 6.6.x oder früher ausführen spark-shell
oder spark-sql
verwenden, übergeben Sie die folgenden Befehle. spark-submit
Anmerkung
-
Amazon EMR 6.2 und 5.31 und höher (Hudi 0.6.x und höher) können das in der Konfiguration weglassen.
spark-avro.jar
-
Amazon EMR 6.5 und 5.35 und höher (Hudi 0.9.x und höher) können in der Konfiguration weglassen
spark.sql.hive.convertMetastoreParquet=false
. -
Amazon EMR 6.6 und 5.36 und höher (Hudi 0.10.x und höher) müssen die
HoodieSparkSessionExtension
Konfiguration enthalten, wie sie im Spark-Leitfaden für Version: 0.10.0 beschrieben ist:--conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
So öffnen Sie die Spark-Shell auf dem Primärknoten
-
Stellen Sie mithilfe von Connect zum Primärknoten herSSH. Weitere Informationen finden Sie unter Connect mit dem primären Knoten herstellen SSH im Amazon EMR Management Guide.
-
Geben Sie den folgenden Befehl ein, um die Spark-Shell zu starten. Um die PySpark Shell zu verwenden, ersetzen Sie
spark-shell
mitpyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Um Hudi mit Amazon EMR Notebooks zu verwenden, müssen Sie zuerst die Hudi-JAR-Dateien aus dem lokalen Dateisystem HDFS auf den Master-Knoten des Notebook-Clusters kopieren. Anschließend verwenden Sie den Notebook-Editor, um Ihr EMR Notebook für die Verwendung von Hudi zu konfigurieren.
Um Hudi mit Amazon EMR Notebooks zu verwenden
-
Erstellen und starten Sie einen Cluster für Amazon EMR Notebooks. Weitere Informationen finden Sie unter Erstellen von EMR Amazon-Clustern für Notebooks im Amazon EMR Management Guide.
-
Stellen Sie mithilfe der JAR-Dateien eine Connect zum Master-Knoten des Clusters her SSH und kopieren Sie sie dann aus dem lokalen Dateisystem in, HDFS wie in den folgenden Beispielen gezeigt. In diesem Beispiel erstellen wir aus HDFS Gründen der Übersichtlichkeit der Dateiverwaltung ein Verzeichnis. Falls gewünschtHDFS, können Sie Ihr eigenes Ziel wählen.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
Öffnen Sie den Notebook-Editor, geben Sie den Code aus dem folgenden Beispiel ein und führen Sie ihn aus.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Um Hudi mit Amazon EMR Notebooks zu verwenden, müssen Sie zuerst die Hudi-JAR-Dateien aus dem lokalen Dateisystem HDFS auf den Master-Knoten des Notebook-Clusters kopieren. Anschließend verwenden Sie den Notebook-Editor, um Ihr EMR Notebook für die Verwendung von Hudi zu konfigurieren.
Um Hudi mit Amazon EMR Notebooks zu verwenden
-
Erstellen und starten Sie einen Cluster für Amazon EMR Notebooks. Weitere Informationen finden Sie unter Erstellen von EMR Amazon-Clustern für Notebooks im Amazon EMR Management Guide.
-
Stellen Sie mithilfe der JAR-Dateien eine Connect zum Master-Knoten des Clusters her SSH und kopieren Sie sie dann aus dem lokalen Dateisystem in, HDFS wie in den folgenden Beispielen gezeigt. In diesem Beispiel erstellen wir aus HDFS Gründen der Übersichtlichkeit der Dateiverwaltung ein Verzeichnis. Falls gewünschtHDFS, können Sie Ihr eigenes Ziel wählen.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
Öffnen Sie den Notebook-Editor, geben Sie den Code aus dem folgenden Beispiel ein und führen Sie ihn aus.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Initialisieren Sie eine Spark-Sitzung für Hudi
Wenn Sie Scala verwenden, müssen Sie die folgenden Klassen in Ihre Spark-Sitzung importieren. Dies muss einmal pro Spark-Sitzung erfolgen.
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig
Schreiben Sie in einen Hudi-Datensatz
Die folgenden Beispiele zeigen, wie Sie einen Hudi-Datensatz erstellen DataFrame und ihn als Hudi-Datensatz schreiben.
Anmerkung
Um Codebeispiele in die Spark-Shell einzufügen, geben Sie an der Eingabeaufforderung :paste
ein, fügen das Beispiel ein und drücken dann CTRL
+ D
.
Jedes Mal, wenn Sie einen DataFrame in einen Hudi-Datensatz schreiben, müssen Sie Folgendes angeben. DataSourceWriteOptions
Viele dieser Optionen sind unter den Schreiboperationen wahrscheinlich identisch. Im folgenden Beispiel werden allgemeine Optionen unter Verwendung der Variablen
angegeben, die von nachfolgenden Beispielen verwendet wird.hudiOptions
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/
"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "
tableName
", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/
"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': '
tableName
', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName
', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/
')
Anmerkung
Möglicherweise sehen Sie in Codebeispielen und Benachrichtigungen „hoodie“ anstelle von Hudi. In der Hudi-Codebasis wird häufig die alte Schreibweise „hoodie“ verwendet.
DataSourceWriteOptions Referenz für Hudi | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Option | Beschreibung | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TABLE_NAME |
Der Tabellenname, unter dem das Dataset registriert werden soll. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TABLE_TYPE_OPT_KEY |
Optional. Gibt an, ob das Dataset als |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
RECORDKEY_FIELD_OPT_KEY |
Das Datensatzschlüsselfeld, dessen Wert als die |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PARTITIONPATH_FIELD_OPT_KEY |
Das Partitionspfadfeld, dessen Wert als die Komponente |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
PRECOMBINE_FIELD_OPT_KEY |
Das Feld, das in der Vorab-Kombination vor dem tatsächlichen Schreiben verwendet wird. Wenn zwei Datensätze denselben Schlüsselwert haben, wählt Hudi den Datensatz mit dem größten Wert für das Vorab-Kombinationsfeld wie von |
Die folgenden Optionen sind nur erforderlich, um die Hudi-Datensatz-Tabelle in Ihrem Metastore zu registrieren. Wenn Sie Ihr Hudi-Datensatz nicht als Tabelle im Hive-Metastore registrieren, sind diese Optionen nicht erforderlich.
DataSourceWriteOptions Referenz für Hive | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Option | Beschreibung | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_DATABASE_OPT_KEY |
Die Hive-Datenbank, mit der synchronisiert werden soll. Der Standardwert ist |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
Die Klasse, mit der Partitionsfeldwerte in Hive-Partitionsspalten extrahiert werden. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_PARTITION_FIELDS_OPT_KEY |
Das Feld im Dataset, anhand dessen Hive-Partitionsspalten bestimmt werden sollen. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_SYNC_ENABLED_OPT_KEY |
Wenn diese Option auf |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_TABLE_OPT_KEY |
Erforderlich Der Name der Tabelle in Hive, mit der synchronisiert werden soll. z. B. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_USER_OPT_KEY |
Optional. Der Hive-Benutzername, der bei der Synchronisierung verwendet werden soll. z. B. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_PASS_OPT_KEY |
Optional. Das von |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
HIVE_URL_OPT_KEY |
Der Hive-Metastore. URL |
Upsert Daten
Das folgende Beispiel zeigt, wie Daten durch Schreiben von A verändert werden. DataFrame Im Gegensatz zum vorherigen Einfügebeispiel wird der Wert OPERATION_OPT_KEY
auf UPSERT_OPERATION_OPT_VAL
eingestellt. Darüber hinaus wird mit .mode(SaveMode.Append)
angegeben, dass der Datensatz angehängt werden soll.
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("
s3://DOC-EXAMPLE-BUCKET/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("
new_value
")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/
"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('
new_value
')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/
')
Einen Datensatz löschen
Um einen Datensatz dauerhaft zu löschen, können Sie eine leere Datenlast einfügen. In diesem Fall gibt die Option PAYLOAD_CLASS_OPT_KEY
die Klasse EmptyHoodieRecordPayload
an. Im Beispiel wird derselbe DataFrame,, verwendetupdateDF
, der im Upsert-Beispiel verwendet wurde, um denselben Datensatz anzugeben.
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("
s3://DOC-EXAMPLE-BUCKET/myhudidataset/
"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("
s3://DOC-EXAMPLE-BUCKET/myhudidataset/
"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('
s3://DOC-EXAMPLE-BUCKET/myhudidataset/
')
Sie können Daten auch dauerhaft löschen, indem Sie OPERATION_OPT_KEY
auf DELETE_OPERATION_OPT_VAL
einstellen, um alle Datensätze in dem von Ihnen eingereichten Datensatz zu entfernen. Anweisungen zur Durchführung von „weichen Löschungen“ und weitere Informationen zum Löschen von Daten, die in Hudi-Tabellen gespeichert sind, finden Sie unter Löschen
Aus einem Hudi-Datensatz lesen
Um Daten zum aktuellen Zeitpunkt abzurufen, führt Hudi standardmäßig Snapshot-Abfragen durch. Im Folgenden finden Sie ein Beispiel für die Abfrage des in S3 geschriebenen Datensatzes in Schreiben Sie in einen Hudi-Datensatz. Ersetzen s3://DOC-EXAMPLE-BUCKET/myhudidataset
mit Ihrem Tabellenpfad und fügen Sie Platzhalter-Sternchen für jede Partitionsebene sowie ein zusätzliches Sternchen hinzu. In diesem Beispiel gibt es eine Partitionsebene, daher haben wir zwei Platzhaltersymbole hinzugefügt.
Anmerkung
Amazon EMR 6.7.0 verwendet Apache Hudi
val snapshotQueryDF = spark.read .format("hudi") .load(
"s3://DOC-EXAMPLE-BUCKET/myhudidataset"
) .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("
s3://DOC-EXAMPLE-BUCKET/myhudidataset
" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('
s3://DOC-EXAMPLE-BUCKET/myhudidataset
' + '/*/*') snapshotQueryDF.show()
Inkrementelle Abfragen
Sie können mit Hudi auch inkrementelle Abfragen durchführen, um einen Stream von Datensätzen abzurufen, die sich seit einem bestimmten Commit-Zeitstempel geändert haben. Setzen Sie dazu das Feld QUERY_TYPE_OPT_KEY
auf QUERY_TYPE_INCREMENTAL_OPT_VAL
. Fügen Sie dann einen Wert für BEGIN_INSTANTTIME_OPT_KEY
hinzu, um alle Datensätze abzurufen, die seit dem angegebenen Zeitpunkt geschrieben wurden. Inkrementelle Abfragen sind in der Regel zehnmal effizienter als ihre Gegenstücke im Batch-Modus, da sie nur geänderte Datensätze verarbeiten.
Wenn Sie inkrementelle Abfragen ausführen, verwenden Sie den Pfad der Stammtabelle (Basistabelle) ohne die für Snapshot-Abfragen verwendeten Platzhaltersterchen.
Anmerkung
Presto unterstützt keine inkrementellen Abfragen.
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY,
<beginInstantTime>
) .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset
" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime':
<beginInstantTime>
, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset
') incQueryDF.show()
Weitere Informationen zum Lesen von Hudi-Datensätzen finden Sie unter Abfragen von Hudi-Tabellen