Uso de un conjunto de datos de Hudi - Amazon EMR

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de un conjunto de datos de Hudi

Hudi permite insertar, actualizar y eliminar datos de conjuntos de datos de Hudi a través de Spark. Para obtener más información, consulte Escritura de tablas de Hudi en la documentación de Apache Hudi.

Los siguientes ejemplos muestran cómo lanzar la consola interactiva de Spark, cómo usar Spark submit o cómo usar Amazon EMR Notebooks para trabajar con Hudi en Amazon. EMR También puedes usar la DeltaStreamer utilidad Hudi u otras herramientas para escribir en un conjunto de datos. A lo largo de esta sección, los ejemplos muestran cómo trabajar con conjuntos de datos mediante el shell de Spark mientras se está conectado al nodo maestro y se utiliza SSH como usuario predeterminadohadoop.

Cuando spark-shell ejecutes o spark-submit spark-sql utilices Amazon EMR 6.7.0 o una versión posterior, pasa los siguientes comandos.

nota

Amazon EMR 6.7.0 usa Apache Hudi 0.11.0-amzn-0, que contiene mejoras significativas con respecto a las versiones anteriores de Hudi. Para obtener más información, consulte la Guía de migración a Apache Hudi 0.11.0. Los ejemplos de esta pestaña reflejan estos cambios.

Para abrir el intérprete de comandos de Spark en el nodo principal
  1. Conéctese al nodo principal medianteSSH. Para obtener más información, consulte Conectarse al nodo principal mediante SSH la Guía EMR de administración de Amazon.

  2. Introduzca el siguiente comando para iniciar el shell de Spark. Para usar la PySpark carcasa, sustituya spark-shell with pyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Cuando ejecute spark-shell o spark-sql utilice Amazon EMR 6.6.x o una versión anterior, ejecute los siguientes comandos. spark-submit

nota
  • Amazon EMR 6.2 y 5.31 y versiones posteriores (Hudi 0.6.x y versiones posteriores) pueden omitirlas en la configuración. spark-avro.jar

  • Amazon EMR 6.5 y 5.35 y versiones posteriores (Hudi 0.9.x y versiones posteriores) pueden omitir spark.sql.hive.convertMetastoreParquet=false en la configuración.

  • Amazon EMR 6.6 y 5.36 y versiones posteriores (Hudi 0.10.x y versiones posteriores) deben incluir la HoodieSparkSessionExtension configuración descrita en la versión: 0.10.0 Spark Guide:

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Para abrir el intérprete de comandos de Spark en el nodo principal
  1. Conéctese al nodo principal medianteSSH. Para obtener más información, consulte Conectarse al nodo principal mediante SSH la Guía EMR de administración de Amazon.

  2. Introduzca el siguiente comando para iniciar el shell de Spark. Para usar la PySpark carcasa, sustituya spark-shell with pyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Para usar Hudi con Amazon EMR Notebooks, primero debes copiar los archivos jar de Hudi del sistema de archivos local al HDFS nodo principal del clúster de cuadernos. A continuación, utilice el editor de cuadernos para configurar su EMR portátil de forma que utilice Hudi.

Para usar Hudi con Amazon Notebooks EMR
  1. Crea y lanza un clúster para Amazon EMR Notebooks. Para obtener más información, consulta Cómo crear EMR clústeres de Amazon para blocs de notas en la Amazon EMR Management Guide.

  2. Conéctese al nodo principal del clúster mediante los archivos jar del sistema de archivos local SSH y, a continuación, cópielos HDFS como se muestra en los siguientes ejemplos. En el ejemplo, creamos un directorio HDFS para facilitar la administración de archivos. Si lo deseaHDFS, puede elegir su propio destino en.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Abra el editor de bloc de notas, escriba el código del siguiente ejemplo y ejecútelo.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Para usar Hudi con Amazon EMR Notebooks, primero debes copiar los archivos jar de Hudi del sistema de archivos local al HDFS nodo principal del clúster de cuadernos. A continuación, utilice el editor de cuadernos para configurar su EMR portátil de forma que utilice Hudi.

Para usar Hudi con Amazon Notebooks EMR
  1. Crea y lanza un clúster para Amazon EMR Notebooks. Para obtener más información, consulta Cómo crear EMR clústeres de Amazon para blocs de notas en la Amazon EMR Management Guide.

  2. Conéctese al nodo principal del clúster mediante los archivos jar del sistema de archivos local SSH y, a continuación, cópielos HDFS como se muestra en los siguientes ejemplos. En el ejemplo, creamos un directorio HDFS para facilitar la administración de archivos. Si lo deseaHDFS, puede elegir su propio destino en.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Abra el editor de bloc de notas, escriba el código del siguiente ejemplo y ejecútelo.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Iniciar una sesión de Spark para Hudi

Al usar Scala, debe importar las siguientes clases a su sesión de Spark. Esto debe hacerse una vez por cada sesión de Spark.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Escribir en un conjunto de datos de Hudi

Los siguientes ejemplos muestran cómo crear un conjunto de datos de Hudi DataFrame y cómo escribirlo como tal.

nota

Para pegar muestras de código en el shell de Spark, escriba :paste en el símbolo del sistema, pegue el ejemplo y, a continuación, pulse CTRL + D.

Cada vez que escribas DataFrame a en un conjunto de datos de Hudi, debes especificarlo. DataSourceWriteOptions Es probable que muchas de estas opciones sean idénticas en varias operaciones de escritura. En el ejemplo siguiente se especifican las opciones comunes mediante la variable hudiOptions, que se usa en los ejemplos posteriores.

nota

Amazon EMR 6.7.0 usa Apache Hudi 0.11.0-amzn-0, que contiene mejoras significativas con respecto a las versiones anteriores de Hudi. Para obtener más información, consulte la Guía de migración a Apache Hudi 0.11.0. Los ejemplos de esta pestaña reflejan estos cambios.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')
nota

Es posible que vea “hoodie” en lugar de Hudi en los ejemplos de código y las notificaciones. El código base de Hudi utiliza ampliamente la antigua palabra “hoodie”.

DataSourceWriteOptions referencia para Hudi
Opción Descripción

TABLE_NAME

Nombre de la tabla bajo la que se registra el dataset.

TABLE_TYPE_OPT_KEY

Opcional. Especifica si el dataset que se va a crear será de tipo "COPY_ON_WRITE" o "MERGE_ON_READ". El valor predeterminado es "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

Campo de clave de registro cuyo valor se utilizará como componente recordKey de HoodieKey. El valor real se obtendrá invocando .toString() para el valor del campo. Los campos anidados se pueden especificar mediante la notación de puntos; por ejemplo, a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

Campo de la ruta de partición cuyo valor se utilizará como componente partitionPath de HoodieKey. El valor real se obtendrá invocando .toString() para el valor del campo.

PRECOMBINE_FIELD_OPT_KEY

Campo utilizado en la combinación previa antes de la escritura real. Cuando dos registros tienen el mismo valor de clave, Hudi selecciona el que tiene el valor mayor para el campo de combinación previa, según lo determinado por Object.compareTo(..).

Las siguientes opciones solo son necesarias para registrar la tabla del conjunto de datos de Hudi en su metaalmacén. Si no registra el conjunto de datos de Hudi como tabla en el metaalmacén de Hudi, estas opciones no son necesarias.

DataSourceWriteOptions referencia para Hive
Opción Descripción

HIVE_DATABASE_OPT_KEY

Base de datos Hive con la que se realizará la sincronización. El valor predeterminado es "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

Clase utilizada para extraer valores de los campos de partición a las columnas de partición de Hive.

HIVE_PARTITION_FIELDS_OPT_KEY

Campo del dataset que se utilizará para determinar las columnas de partición de Hive.

HIVE_SYNC_ENABLED_OPT_KEY

Cuando se establece en "true", registra el dataset en el metaalmacén de Apache Hive. El valor predeterminado es "false".

HIVE_TABLE_OPT_KEY

Obligatorio. Nombre de la tabla de Hive con la que se va a realizar la sincronización. Por ejemplo, "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Opcional. Nombre de usuario de Hive que se va a utilizar al sincronizar. Por ejemplo, "hadoop".

HIVE_PASS_OPT_KEY

Opcional. Contraseña de Hive para el usuario especificado por HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

La metatienda Hive. URL

Datos de upsert

El siguiente ejemplo muestra cómo alterar los datos escribiendo un. DataFrame A diferencia del ejemplo de inserción anterior, el valor de OPERATION_OPT_KEY se establece en UPSERT_OPERATION_OPT_VAL. Además, se especifica .mode(SaveMode.Append) para indicar que el registro se debe anexar.

nota

Amazon EMR 6.7.0 usa Apache Hudi 0.11.0-amzn-0, que contiene mejoras significativas con respecto a las versiones anteriores de Hudi. Para obtener más información, consulte la Guía de migración a Apache Hudi 0.11.0. Los ejemplos de esta pestaña reflejan estos cambios.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

Eliminación de un registro

Para eliminar un registro de forma permanente, puede alterar una carga útil vacía. En este caso, la opción PAYLOAD_CLASS_OPT_KEY especifica la clase EmptyHoodieRecordPayload. En el ejemplo se utiliza lo mismo DataFrame que en el ejemplo de upsert para especificar el mismo registro. updateDF

nota

Amazon EMR 6.7.0 usa Apache Hudi 0.11.0-amzn-0, que contiene mejoras significativas con respecto a las versiones anteriores de Hudi. Para obtener más información, consulte la Guía de migración a Apache Hudi 0.11.0. Los ejemplos de esta pestaña reflejan estos cambios.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://DOC-EXAMPLE-BUCKET/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://DOC-EXAMPLE-BUCKET/myhudidataset/')

También puede eliminar datos de forma permanente si configura OPERATION_OPT_KEY como DELETE_OPERATION_OPT_VAL para eliminar todos los registros del conjunto de datos que envíe. Para obtener instrucciones sobre cómo llevar a cabo eliminaciones temporales y para obtener más información sobre cómo eliminar los datos almacenados en las tablas de Hudi, consulte Eliminaciones en la documentación de Apache Hudi.

Leer un conjunto de datos de Hudi

Para recuperar los datos en el momento actual, Hudi lleva a cabo consultas instantáneas de forma predeterminada. El siguiente es un ejemplo de consulta del conjunto de datos escrito en S3 en Escribir en un conjunto de datos de Hudi. Reemplazar s3://DOC-EXAMPLE-BUCKET/myhudidataset con la ruta de la tabla y añada asteriscos comodín para cada nivel de partición, además de un asterisco adicional. En este ejemplo, hay un nivel de partición, por lo que hemos agregado dos símbolos comodín.

nota

Amazon EMR 6.7.0 usa Apache Hudi 0.11.0-amzn-0, que contiene mejoras significativas con respecto a las versiones anteriores de Hudi. Para obtener más información, consulte la Guía de migración a Apache Hudi 0.11.0. Los ejemplos de esta pestaña reflejan estos cambios.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset' + '/*/*') snapshotQueryDF.show()

Consultas incrementales

También puede hacer consultas incrementales con Hudi para obtener un flujo de registros que han cambiado desde una fecha de confirmación determinada. Para ello, defina el campo QUERY_TYPE_OPT_KEY en QUERY_TYPE_INCREMENTAL_OPT_VAL. A continuación, agregue un valor para BEGIN_INSTANTTIME_OPT_KEY para obtener todos los registros escritos desde el momento especificado. Las consultas incrementales suelen ser diez veces más eficientes que las consultas por lotes, ya que solo procesan los registros modificados.

Al hacer consultas incrementales, utilice la ruta de la tabla raíz (base) sin los asteriscos comodín que se utilizan en las consultas instantáneas.

nota

Presto no admite consultas incrementales.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://DOC-EXAMPLE-BUCKET/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://DOC-EXAMPLE-BUCKET/myhudidataset') incQueryDF.show()

Para obtener más información acerca de la lectura de los conjuntos de datos de Hudi, consulte la sección sobre Cómo consultar tablas de Hudi en la documentación de Apache Hudi.