Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

Trabalhe com um conjunto de dados do Hudi - Amazon EMR

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Trabalhe com um conjunto de dados do Hudi

O Hudi oferece suporte à inserção, atualização e exclusão de dados em conjuntos de dados do Hudi por meio do Spark. Para obter mais informações, consulte Gravar tabelas do Hudi na documentação do Apache Hudi.

Os exemplos a seguir demonstram como iniciar o shell interativo do Spark, usar o envio do Spark ou usar os Cadernos do Amazon EMR para trabalhar com o Hudi no Amazon EMR. Você também pode usar o DeltaStreamer utilitário Hudi ou outras ferramentas para gravar em um conjunto de dados. Ao longo desta seção, os exemplos demonstram como trabalhar com conjuntos de dados usando o shell do Spark durante a conexão com o nó principal usando SSH como usuário padrão hadoop.

Ao executar spark-shell, spark-submit ou spark-sql ao usar as versões 6.7.0 ou posteriores do Amazon EMR , passe os comandos a seguir.

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

Abrir o shell do Spark no nó primário
  1. Conectar-se ao nó primário usando SSH. Para obter mais informações, consulte Connect to the primary node using SSH no Guia de gerenciamento do Amazon EMR.

  2. Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha, spark-shell substitua porpyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Ao executar spark-shell, spark-submit ou spark-sql ao usar as versões 6.7.0 ou posteriores do Amazon EMR , passe os comandos a seguir.

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

Abrir o shell do Spark no nó primário
  1. Conectar-se ao nó primário usando SSH. Para obter mais informações, consulte Connect to the primary node using SSH no Guia de gerenciamento do Amazon EMR.

  2. Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha, spark-shell substitua porpyspark.

    spark-shell --jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"

Ao executar spark-shell, spark-submit ou spark-sql ao usar as versões 6.6.x ou anteriores do Amazon EMR , passe os comandos a seguir.

nota
  • O Amazon EMR 6.2 e as versões 5.31 e posteriores (versões 0.6.x e posteriores do Hudi) podem omitir o spark-avro.jar da configuração.

  • O Amazon EMR 6.5 e as versões 5.35 e posteriores (versões 0.9.x e posteriores do Hudi) podem omitir o spark.sql.hive.convertMetastoreParquet=false da configuração.

  • O Amazon EMR 6.6 e as versões 5.36 e posteriores (versões 0.10.x e posteriores do Hudi) devem incluir a configuração HoodieSparkSessionExtension conforme descrito na Versão: Guia do Spark 0.10.0:

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Abrir o shell do Spark no nó primário
  1. Conectar-se ao nó primário usando SSH. Para obter mais informações, consulte Connect to the primary node using SSH no Guia de gerenciamento do Amazon EMR.

  2. Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha, spark-shell substitua porpyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Ao executar spark-shell, spark-submit ou spark-sql ao usar as versões 6.6.x ou anteriores do Amazon EMR , passe os comandos a seguir.

nota
  • O Amazon EMR 6.2 e as versões 5.31 e posteriores (versões 0.6.x e posteriores do Hudi) podem omitir o spark-avro.jar da configuração.

  • O Amazon EMR 6.5 e as versões 5.35 e posteriores (versões 0.9.x e posteriores do Hudi) podem omitir o spark.sql.hive.convertMetastoreParquet=false da configuração.

  • O Amazon EMR 6.6 e as versões 5.36 e posteriores (versões 0.10.x e posteriores do Hudi) devem incluir a configuração HoodieSparkSessionExtension conforme descrito na Versão: Guia do Spark 0.10.0:

    --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Abrir o shell do Spark no nó primário
  1. Conectar-se ao nó primário usando SSH. Para obter mais informações, consulte Connect to the primary node using SSH no Guia de gerenciamento do Amazon EMR.

  2. Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha, spark-shell substitua porpyspark.

    spark-shell \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar

Para usar o Hudi com os Cadernos do Amazon EMR, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para o HDFS no nó principal do cluster do caderno. Em seguida, use o editor de caderno para configurar o caderno do EMR para usar o Hudi.

Usar o Hudi com os Cadernos do Amazon EMR
  1. Crie e inicie um cluster para Cadernos do Amazon EMR. Para obter mais informações, consulte Creating Amazon EMR clusters for notebooks no Guia de gerenciamento do Amazon EMR.

  2. Conecte-se ao nó principal do cluster usando SSH e copie os arquivos jar do sistema de arquivos local para o HDFS, conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório no HDFS para fins de clareza do gerenciamento de arquivos. É possível escolher seu próprio destino no HDFS, se desejar.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Para usar o Hudi com os Cadernos do Amazon EMR, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para o HDFS no nó principal do cluster do caderno. Em seguida, use o editor de caderno para configurar o caderno do EMR para usar o Hudi.

Usar o Hudi com os Cadernos do Amazon EMR
  1. Crie e inicie um cluster para Cadernos do Amazon EMR. Para obter mais informações, consulte Creating Amazon EMR clusters for notebooks no Guia de gerenciamento do Amazon EMR.

  2. Conecte-se ao nó principal do cluster usando SSH e copie os arquivos jar do sistema de arquivos local para o HDFS, conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório no HDFS para fins de clareza do gerenciamento de arquivos. É possível escolher seu próprio destino no HDFS, se desejar.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
  3. Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.

    %%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}

Para usar o Hudi com os Cadernos do Amazon EMR, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para o HDFS no nó principal do cluster do caderno. Em seguida, use o editor de caderno para configurar o caderno do EMR para usar o Hudi.

Usar o Hudi com os Cadernos do Amazon EMR
  1. Crie e inicie um cluster para Cadernos do Amazon EMR. Para obter mais informações, consulte Creating Amazon EMR clusters for notebooks no Guia de gerenciamento do Amazon EMR.

  2. Conecte-se ao nó principal do cluster usando SSH e copie os arquivos jar do sistema de arquivos local para o HDFS, conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório no HDFS para fins de clareza do gerenciamento de arquivos. É possível escolher seu próprio destino no HDFS, se desejar.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Para usar o Hudi com os Cadernos do Amazon EMR, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para o HDFS no nó principal do cluster do caderno. Em seguida, use o editor de caderno para configurar o caderno do EMR para usar o Hudi.

Usar o Hudi com os Cadernos do Amazon EMR
  1. Crie e inicie um cluster para Cadernos do Amazon EMR. Para obter mais informações, consulte Creating Amazon EMR clusters for notebooks no Guia de gerenciamento do Amazon EMR.

  2. Conecte-se ao nó principal do cluster usando SSH e copie os arquivos jar do sistema de arquivos local para o HDFS, conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório no HDFS para fins de clareza do gerenciamento de arquivos. É possível escolher seu próprio destino no HDFS, se desejar.

    hdfs dfs -mkdir -p /apps/hudi/lib
    hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
    hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
  3. Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.

    { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}

Inicializar uma sessão do Spark para Hudi

Ao usar o Scala, você deve importar as seguintes classes na sessão do Spark. Isso precisa ser feito uma vez por sessão do Spark.

import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ import org.apache.hudi.DataSourceWriteOptions import org.apache.hudi.DataSourceReadOptions import org.apache.hudi.config.HoodieWriteConfig import org.apache.hudi.hive.MultiPartKeysValueExtractor import org.apache.hudi.hive.HiveSyncConfig import org.apache.hudi.sync.common.HoodieSyncConfig

Gravar em um conjunto de dados do Hudi

Os exemplos a seguir mostram como criar um DataFrame e escrevê-lo como um conjunto de dados Hudi.

nota

Para colar exemplos de código no shell do Spark, digite :paste no prompt, cole o exemplo e pressione CTRL + D.

Cada vez que você grava um DataFrame em um conjunto de dados Hudi, você deve especificar. DataSourceWriteOptions Muitas dessas opções provavelmente serão idênticas entre as operações de gravação. O exemplo a seguir especifica opções comuns usando a variável hudiOptions, usada pelos exemplos subsequentes.

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TBL_NAME.key -> "tableName", DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor", HoodieSyncConfig.META_SYNC_ENABLED.key -> "true", HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms", HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName", HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date" ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert") .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

// Create a DataFrame val inputDF = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z") ).toDF("id", "creation_date", "last_update_time") //Specify common DataSourceWriteOptions in the single hudiOptions variable val hudiOptions = Map[String,String]( HoodieWriteConfig.TABLE_NAME -> "tableName", DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE", DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id", DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date", DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time", DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true", DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName", DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date", DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName ) // Write the DataFrame as a Hudi dataset (inputDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Overwrite) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

# Create a DataFrame inputDF = spark.createDataFrame( [ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"), ("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"), ("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"), ], ["id", "creation_date", "last_update_time"] ) # Specify common DataSourceWriteOptions in the single hudiOptions variable hudiOptions = { 'hoodie.table.name': 'tableName', 'hoodie.datasource.write.recordkey.field': 'id', 'hoodie.datasource.write.partitionpath.field': 'creation_date', 'hoodie.datasource.write.precombine.field': 'last_update_time', 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.table': 'tableName', 'hoodie.datasource.hive_sync.partition_fields': 'creation_date', 'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor' } # Write a DataFrame as a Hudi dataset inputDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'insert') \ .options(**hudiOptions) \ .mode('overwrite') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')
nota

Você pode ver “hoodie” em vez de Hudi em exemplos de código e notificações. A base de código do Hudi usa amplamente a antiga grafia “hoodie”.

DataSourceWriteOptions referência para Hudi
Opção Descrição

TABLE_NAME

O nome da tabela com o qual registrar o conjunto de dados.

TABLE_TYPE_OPT_KEY

Opcional. Especifica se o conjunto de dados foi criado como "COPY_ON_WRITE" ou "MERGE_ON_READ". O padrão é "COPY_ON_WRITE".

RECORDKEY_FIELD_OPT_KEY

O campo de chave de registro cujo valor será usado como o componente recordKey de HoodieKey. O valor real será obtido invocando .toString() no valor do campo. Campos aninhados podem ser especificados usando a notação de pontos, por exemplo, a.b.c.

PARTITIONPATH_FIELD_OPT_KEY

O campo de caminho de partição cujo valor será usado como o componente partitionPath de HoodieKey. O valor real será obtido invocando .toString() no valor do campo.

PRECOMBINE_FIELD_OPT_KEY

O campo usado na pré-combinação antes da gravação real. Quando dois registros têm o mesmo valor de chave, o Hudi seleciona aquele com o maior valor para o campo de pré-combinação, conforme determinado por Object.compareTo(..).

As opções a seguir são necessárias apenas para registrar a tabela do conjunto de dados do Hudi no seu metastore. Se você não registrar o conjunto de dados do Hudi como uma tabela no metastore do Hive, essas opções não serão necessárias.

DataSourceWriteOptions referência para Hive
Opção Descrição

HIVE_DATABASE_OPT_KEY

O banco de dados do Hive com o qual sincronizar. O padrão é "default".

HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY

A classe usada para extrair valores de campo de partição em colunas de partição do Hive.

HIVE_PARTITION_FIELDS_OPT_KEY

O campo no conjunto de dados a ser usado para determinar colunas de partição do Hive.

HIVE_SYNC_ENABLED_OPT_KEY

Quando definido como "true", registra o conjunto de dados no metastore do Apache Hive. O padrão é "false".

HIVE_TABLE_OPT_KEY

Obrigatório. O nome da tabela no Hive com a qual sincronizar. Por exemplo, "my_hudi_table_cow".

HIVE_USER_OPT_KEY

Opcional. O nome de usuário do Hive a ser usado ao sincronizar. Por exemplo, "hadoop".

HIVE_PASS_OPT_KEY

Opcional. A senha do Hive para o usuário especificado por HIVE_USER_OPT_KEY.

HIVE_URL_OPT_KEY

O URL do metastore do Hive.

Upsert dados

O exemplo a seguir demonstra como alterar dados escrevendo um. DataFrame Ao contrário do exemplo anterior de inserção, o valor OPERATION_OPT_KEY é definido como UPSERT_OPERATION_OPT_VAL. Além disso, .mode(SaveMode.Append) é especificado para indicar que o registro deve ser anexado.

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

// Create a new DataFrame from the first row of inputDF with a different creation_date value val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value")) (updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .options(hudiOptions) .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

from pyspark.sql.functions import lit # Create a new DataFrame from the first row of inputDF with a different creation_date value updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value')) updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Excluir um registro

Para excluir um registro de forma irreversível, você pode upsert uma carga útil vazia. Nesse caso, a opção PAYLOAD_CLASS_OPT_KEY especifica a classe EmptyHoodieRecordPayload. O exemplo usa o mesmo DataFrame,updateDF, usado no exemplo upsert para especificar o mesmo registro.

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

(updateDF.write .format("hudi") .options(hudiOptions) .option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))

(updateDF.write .format("org.apache.hudi") .option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL) .option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload") .mode(SaveMode.Append) .save("s3://amzn-s3-demo-bucket/myhudidataset/"))
updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

updateDF.write \ .format('org.apache.hudi') \ .option('hoodie.datasource.write.operation', 'upsert') \ .option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \ .options(**hudiOptions) \ .mode('append') \ .save('s3://amzn-s3-demo-bucket/myhudidataset/')

Você também pode excluir dados de forma irreversível definindo OPERATION_OPT_KEY como DELETE_OPERATION_OPT_VAL para remover todos os registros no conjunto de dados enviado. Para obter instruções sobre como realizar exclusões reversíveis e obter mais informações sobre a exclusão de dados armazenados em tabelas do Hudi, consulte Exclusões na documentação do Apache Hudi.

Ler em um conjunto de dados do Hudi

Para recuperar dados no momento atual, o Hudi realiza consultas de snapshots por padrão. Veja a seguir um exemplo de consulta do conjunto de dados gravado no S3 em Gravar em um conjunto de dados do Hudi. s3://amzn-s3-demo-bucket/myhudidatasetSubstitua pelo caminho da tabela e adicione asteriscos curinga para cada nível de partição, além de um asterisco adicional. Neste exemplo, há um nível de partição, portanto adicionamos dois símbolos curinga.

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()

nota

O Amazon EMR 6.7.0 usa o Apache Hudi 0.11.0-amzn-0, que contém melhorias significativas em relação às versões anteriores do Hudi. Para obter mais informações, consulte o Guia de migração do Apache Hudi 0.11.0. Os exemplos nesta guia refletem essas alterações.

val snapshotQueryDF = spark.read .format("hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset") .show()
(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()

(val snapshotQueryDF = spark.read .format("org.apache.hudi") .load("s3://amzn-s3-demo-bucket/myhudidataset" + "/*/*")) snapshotQueryDF.show()
snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

snapshotQueryDF = spark.read \ .format('org.apache.hudi') \ .load('s3://amzn-s3-demo-bucket/myhudidataset' + '/*/*') snapshotQueryDF.show()

Consultas incrementais

Você também pode realizar consultas incrementais com o Hudi para obter um fluxo de registros que foram alterados desde um determinado carimbo de data/hora de confirmação. Para fazer isso, defina o campo QUERY_TYPE_OPT_KEY como QUERY_TYPE_INCREMENTAL_OPT_VAL. Em seguida, adicione um valor para BEGIN_INSTANTTIME_OPT_KEY para obter todos os registros gravados desde a hora especificada. Normalmente, as consultas incrementais são dez vezes mais eficientes do que as de lote, pois processam somente registros alterados.

Ao realizar consultas incrementais, use o caminho da tabela raiz (básica) sem os asteriscos curinga usados nas consultas Snapshot.

nota

O Presto não é compatível com consultas incrementais.

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()

(val incQueryDF = spark.read .format("org.apache.hudi") .option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL) .option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>) .load("s3://amzn-s3-demo-bucket/myhudidataset" )) incQueryDF.show()
readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

readOptions = { 'hoodie.datasource.query.type': 'incremental', 'hoodie.datasource.read.begin.instanttime': <beginInstantTime>, } incQueryDF = spark.read \ .format('org.apache.hudi') \ .options(**readOptions) \ .load('s3://amzn-s3-demo-bucket/myhudidataset') incQueryDF.show()

Para obter mais informações sobre a leitura de conjuntos de dados do Hudi, consulte Consultar tabelas do Hudi na documentação do Apache Hudi.

PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.