Bekerja dengan set data Hudi - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Bekerja dengan set data Hudi

Hudi mendukung penyisipan, perbaruan, dan penghapusan data dalam set data Hudi melalui Spark. Untuk informasi selengkapnya, lihat Menulis tabel Hudi dalam dokumentasi Apache Hudi.

Contoh berikut menunjukkan cara meluncurkan shell Spark interaktif, gunakan Spark kirim, atau gunakan Amazon EMR Notebooks untuk bekerja dengan Hudi di Amazon EMR. Anda juga dapat menggunakan DeltaStreamer utilitas Hudi atau alat lain untuk menulis ke set data. Sepanjang bagian ini, contoh menunjukkan bekerja dengan set data menggunakan shell Spark saat terhubung ke simpul utama menggunakan SSH sebagai default hadoop pengguna.

catatan

Hudi 0.6.0 memasukkan paket spark-avro sebagai ketergantungan dengan nama yang berbeda. Anda tidak perlu menyertakan spark-avro.jar dalam konfigurasi Anda saat Anda menggunakan EMR 5.31.0 dan yang lebih baru.

spark-shell
Untuk membuka shell Spark pada simpul utama
  1. Connect ke simpul utama menggunakan SSH. Untuk informasi selengkapnya, lihat Connect ke simpul utama menggunakan SSH di Panduan Pengelolaan Amazon EMR.

  2. Masukkan perintah berikut untuk meluncurkan shell Spark. Untuk menggunakan PySpark shell, ganti spark-shell dengan pyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
spark-submit

Untuk mengirimkan aplikasi Spark yang menggunakan Hudi, pastikan untuk lulus parameter berikut untuk spark-submit.

spark-submit \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer"\ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Amazon EMR Notebooks

Untuk menggunakan Hudi dengan Amazon EMR Notebooks, Anda harus terlebih dahulu menyalin file jar Hudi dari sistem file lokal untuk HDFS pada simpul utama klaster notebook. Anda kemudian menggunakan editor notebook untuk mengonfigurasi notebook EMR Anda untuk menggunakan Hudi.

Untuk menggunakan Hudi dengan Amazon EMR Notebooks
  1. Buat dan luncurkan sebuah klaster untuk Amazon EMR Notebooks. Untuk informasi lebih lanjut, lihat Membuat klaster Amazon EMR untuk Notebooks di Panduan Pengelolaan Amazon EMR.

  2. Connect ke simpul utama klaster menggunakan SSH dan kemudian salin file jar dari filesystem lokal untuk HDFS seperti yang ditunjukkan dalam contoh berikut. Dalam contoh, kita membuat direktori di HDFS untuk kejelasan manajemen file. Anda dapat memilih tujuan Anda sendiri dalam HDFS, jika diinginkan.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Buka editor notebook, masukkan kode dari contoh berikut, dan jalankan.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Inisialisasi sesi Spark untuk Hudi

Bila Anda menggunakan Scala, Anda harus mengimpor kelas berikut di sesi Spark Anda. Hal ini perlu dilakukan sekali per sesi Spark.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor

Tulis ke set data Hudi

Contoh berikut menunjukkan cara membuat DataFrame dan menuliskannya sebagai set data Hudi.

catatan

Untuk menyisipkan contoh kode ke shell Spark, ketik :paste pada prompt, tempel contoh, dan kemudian tekan CTRL + D.

Setiap kali Anda menulis DataFrame set data Hudi, Anda harus menentukanDataSourceWriteOptions. Banyak dari opsi ini cenderung identik antara operasi tulis. Contoh berikut menentukan pilihan umum menggunakan hudiOptions variabel, yang digunakan contoh berikutnya.

Scala
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "my_hudi_table", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "my_hudi_table", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
PySpark
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'my_hudi_table', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'my_hudi_table', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')
catatan

Anda mungkin melihat "hoodie" alih-alih Hudi dalam contoh kode dan pemberitahuan. Basis kode Hudi secara luas menggunakan ejaan "hoodie" lama.

DataSourceWriteOptions referensi untuk Hudi
Opsi Deskripsi

TABLE_NAME

Nama tabel untuk mendaftarkan set data.

TABLE_TYPE_OPT_KEY

Opsional. Menentukan apakah set data dibuat sebagai "COPY_ON_WRITE" atau "MERGE_ON_READ". Default adalah "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

Bidang kunci catatan yang nilainya akan digunakan sebagai recordKey komponen dari HoodieKey. Nilai aktual akan diperoleh dengan meminta .toString() pada nilai bidang. Bidang berlapis dapat ditentukan menggunakan notasi titik, misalnya, a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

Bidang jalur partisi yang nilainya akan digunakan sebagai partitionPath komponen dari HoodieKey. Nilai aktual akan diperoleh dengan meminta .toString() pada nilai bidang.

PRECOMBINE_FIELD_OPT_KEY

Bidang yang digunakan dalam pra-menggabungkan sebelum penulisan yang sebenarnya. Ketika dua catatan memiliki nilai kunci yang sama, Hudi memilih tyang memiliki nilai terbesar untuk bidang penggabungan sebagaimana ditentukan oleh Object.compareTo(..).

Opsi berikut diperlukan hanya untuk mendaftarkan tabel set data Hudi metastore Anda. Jika Anda tidak mendaftar set data Hudi Anda sebagai tabel di metastore Hive, pilihan ini tidak diperlukan.

DataSourceWriteOptions referensi untuk Hive
Opsi Deskripsi

HIVE_DATABASE_OPT_KEY

Basis data Hive untuk disinkronkan ke. Default adalah "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

Kelas yang digunakan untuk mengekstrak nilai bidang ke kolom partisi Hive.

HIVE_PARTITION_FIELDS_OPT_KEY

Bidang dalam set data yang digunakan untuk menentukan kolom partisi Hive.

HIVE_SYNC_ENABLED_OPT_KEY

Ketika diatur ke "true", daftarkan set data dengan metastore Apache Hive. Default adalah "false".

HIVE_TABLE_OPT_KEY

Diperlukan. Nama tabel di Hive untuk disinkronkan ke. Sebagai contoh, "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Opsional. Nama pengguna Hive untuk digunakan saat sinkronisasi. Sebagai contoh, "hadoop".

HIVE_PASS_OPT_KEY

Opsional. Kata sandi Hive untuk pengguna yang ditentukan oleh HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

URL metastore Hive.

Tambahkan data

Contoh berikut menunjukkan bagaimana cara menambahkan data dengan menulis DataFrame. Berbeda dengan contoh penyisipan sebelumnya, nilai OPERATION_OPT_KEY diatur ke UPSERT_OPERATION_OPT_VAL. Selain itu, .mode(SaveMode.Append) ditentukan untuk menunjukkan bahwa catatan harus ditambahkan.

Scala
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
PySpark
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

Hapus catatan

Untuk menghapus catatan, Anda dapat menambahkan muatan kosong. Dalam hal ini, pilihan PAYLOAD_CLASS_OPT_KEY menentukan EmptyHoodieRecordPayload kelas. Contoh menggunakan yang sama DataFrame,updateDF, digunakan dalam contoh penambahan untuk menentukan catatan yang sama.

Scala
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
PySpark
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

Anda juga dapat menghapus data dengan menyetel OPERATION_OPT_KEY ke DELETE_OPERATION_OPT_VAL untuk menghapus semua catatan dalam set data yang Anda kirimkan. Untuk petunjuk tentang melakukan penghapusan secara halus, dan untuk informasi lebih lanjut tentang menghapus data yang disimpan dalam tabel Hudi, lihat Menghapus dalam dokumentasi Apache Hudi.

Baca dari set data Hudi

Untuk mengambil data pada saat ini, Hudi melakukan kueri snapshot secara default. Berikut ini adalah contoh mengkueri set data yang ditulis ke S3 di Tulis ke set data Hudi. Ganti s3://DOC-EXAMPLE-BUCKET/myhudidataset dengan jalur tabel Anda, dan tambahkan tanda bintang wildcard untuk setiap tingkat partisi, ditambah satu tanda bintang tambahan. Dalam contoh ini, ada satu tingkat partisi, jadi kami telah menambahkan dua simbol wildcard.

Scala
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" + "/*/*")) snapshotQueryDF.show()
PySpark
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset' + '/*/*') snapshotQueryDF.show()

Kueri tambahan

Anda juga dapat melakukan kueri tambahan dengan Hudi untuk mendapatkan aliran catatan yang telah berubah sejak stempel waktu komit diberikan. Untuk melakukannya, atur QUERY_TYPE_OPT_KEY bidang ke QUERY_TYPE_INCREMENTAL_OPT_VAL. Kemudian, tambahkan nilai BEGIN_INSTANTTIME_OPT_KEY untuk mendapatkan semua catatan yang ditulis sejak waktu yang ditentukan. Permintaan tambahan biasanya sepuluh kali lebih efisien daripada rekan batch mereka karena hanya memproses catatan yang berubah.

Ketika Anda melakukan kueri tambahan, gunakan jalur tabel akar (dasar) tanpa tanda bintang wildcard yang digunakan untuk kueri Snapshot.

catatan

Presto tidak mendukung kueri tambahan.

Scala
(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" )) incQueryDF.show()
PySpark
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset') incQueryDF.show()

Untuk informasi lebih lanjut tentang membaca dari set data Hudi, lihat Mengkueri tabel Hudi dalam dokumentasi Apache Hudi.