As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Trabalhe com um conjunto de dados do Hudi
O Hudi oferece suporte à inserção, atualização e exclusão de dados em conjuntos de dados do Hudi por meio do Spark. Para obter mais informações, consulte Gravar tabelas do Hudi
Os exemplos a seguir demonstram como iniciar o shell interativo do Spark, usar o envio do Spark ou usar os Cadernos do Amazon EMR para trabalhar com o Hudi no Amazon EMR. Você também pode usar o DeltaStreamer utilitário Hudi ou outras ferramentas para gravar em um conjunto de dados. Ao longo desta seção, os exemplos demonstram como trabalhar com conjuntos de dados usando o shell do Spark durante a conexão com o nó principal usando SSH como usuário padrão hadoop
.
Ao executar spark-shell
, spark-submit
ou spark-sql
ao usar as versões 6.7.0 ou posteriores do Amazon EMR , passe os comandos a seguir.
nota
O Amazon EMR 6.7.0 usa o Apache Hudi
Abrir o shell do Spark no nó primário
-
Conectar-se ao nó primário usando SSH. Para obter mais informações, consulte Connect to the primary node using SSH no Guia de gerenciamento do Amazon EMR.
-
Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha,
spark-shell
substitua porpyspark
.spark-shell
--jars /usr/lib/hudi/hudi-spark-bundle.jar \ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" \ --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension"
Ao executar spark-shell
, spark-submit
ou spark-sql
ao usar as versões 6.6.x ou anteriores do Amazon EMR , passe os comandos a seguir.
nota
-
O Amazon EMR 6.2 e as versões 5.31 e posteriores (versões 0.6.x e posteriores do Hudi) podem omitir o
spark-avro.jar
da configuração. -
O Amazon EMR 6.5 e as versões 5.35 e posteriores (versões 0.9.x e posteriores do Hudi) podem omitir o
spark.sql.hive.convertMetastoreParquet=false
da configuração. -
O Amazon EMR 6.6 e as versões 5.36 e posteriores (versões 0.10.x e posteriores do Hudi) devem incluir a configuração
HoodieSparkSessionExtension
conforme descrito na Versão: Guia do Spark 0.10.0: --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" \
Abrir o shell do Spark no nó primário
-
Conectar-se ao nó primário usando SSH. Para obter mais informações, consulte Connect to the primary node using SSH no Guia de gerenciamento do Amazon EMR.
-
Digite o seguinte comando para iniciar o shell do Spark. Para usar a PySpark concha,
spark-shell
substitua porpyspark
.spark-shell
\ --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" \ --conf "spark.sql.hive.convertMetastoreParquet=false" \ --jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
Para usar o Hudi com os Cadernos do Amazon EMR, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para o HDFS no nó principal do cluster do caderno. Em seguida, use o editor de caderno para configurar o caderno do EMR para usar o Hudi.
Usar o Hudi com os Cadernos do Amazon EMR
-
Crie e inicie um cluster para Cadernos do Amazon EMR. Para obter mais informações, consulte Creating Amazon EMR clusters for notebooks no Guia de gerenciamento do Amazon EMR.
-
Conecte-se ao nó principal do cluster usando SSH e copie os arquivos jar do sistema de arquivos local para o HDFS, conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório no HDFS para fins de clareza do gerenciamento de arquivos. É possível escolher seu próprio destino no HDFS, se desejar.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
-
Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.
%%configure { "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.hudi.catalog.HoodieCatalog", "spark.sql.extensions":"org.apache.spark.sql.hudi.HoodieSparkSessionExtension" }}
Para usar o Hudi com os Cadernos do Amazon EMR, você deve primeiro copiar os arquivos jar do Hudi do sistema de arquivos local para o HDFS no nó principal do cluster do caderno. Em seguida, use o editor de caderno para configurar o caderno do EMR para usar o Hudi.
Usar o Hudi com os Cadernos do Amazon EMR
-
Crie e inicie um cluster para Cadernos do Amazon EMR. Para obter mais informações, consulte Creating Amazon EMR clusters for notebooks no Guia de gerenciamento do Amazon EMR.
-
Conecte-se ao nó principal do cluster usando SSH e copie os arquivos jar do sistema de arquivos local para o HDFS, conforme mostrado nos exemplos a seguir. No exemplo, criamos um diretório no HDFS para fins de clareza do gerenciamento de arquivos. É possível escolher seu próprio destino no HDFS, se desejar.
hdfs dfs -mkdir -p /apps/hudi/lib
hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
-
Abra o editor do notebook, insira o código do exemplo a seguir e execute-o.
{ "conf": { "spark.jars":"hdfs:///apps/hudi/lib/hudi-spark-bundle.jar,hdfs:///apps/hudi/lib/spark-avro.jar", "spark.serializer":"org.apache.spark.serializer.KryoSerializer", "spark.sql.hive.convertMetastoreParquet":"false" }}
Inicializar uma sessão do Spark para Hudi
Ao usar o Scala, você deve importar as seguintes classes na sessão do Spark. Isso precisa ser feito uma vez por sessão do Spark.
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.DataSourceReadOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.HiveSyncConfig
import org.apache.hudi.sync.common.HoodieSyncConfig
Gravar em um conjunto de dados do Hudi
Os exemplos a seguir mostram como criar um DataFrame e escrevê-lo como um conjunto de dados Hudi.
nota
Para colar exemplos de código no shell do Spark, digite :paste
no prompt, cole o exemplo e pressione CTRL
+ D
.
Cada vez que você grava um DataFrame em um conjunto de dados Hudi, você deve especificar. DataSourceWriteOptions
Muitas dessas opções provavelmente serão idênticas entre as operações de gravação. O exemplo a seguir especifica opções comuns usando a variável
, usada pelos exemplos subsequentes.hudiOptions
nota
O Amazon EMR 6.7.0 usa o Apache Hudi
// Create a DataFrame
val inputDF = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
).toDF("id", "creation_date", "last_update_time")
//Specify common DataSourceWriteOptions in the single hudiOptions variable
val hudiOptions = Map[String,String](
HoodieWriteConfig.TBL_NAME.key -> "tableName
",
DataSourceWriteOptions.TABLE_TYPE.key -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
HoodieSyncConfig.META_SYNC_PARTITION_EXTRACTOR_CLASS.key -> "org.apache.hudi.hive.MultiPartKeysValueExtractor",
HoodieSyncConfig.META_SYNC_ENABLED.key -> "true",
HiveSyncConfig.HIVE_SYNC_MODE.key -> "hms",
HoodieSyncConfig.META_SYNC_TABLE_NAME.key -> "tableName
",
HoodieSyncConfig.META_SYNC_PARTITION_FIELDS.key -> "creation_date"
)
// Write the DataFrame as a Hudi dataset
(inputDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY,"insert")
.mode(SaveMode.Overwrite)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a DataFrame
val inputDF = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z")
).toDF("id", "creation_date", "last_update_time")
//Specify common DataSourceWriteOptions in the single hudiOptions variable
val hudiOptions = Map[String,String](
HoodieWriteConfig.TABLE_NAME -> "tableName
",
DataSourceWriteOptions.TABLE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "id",
DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY -> "creation_date",
DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "last_update_time",
DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY -> "true",
DataSourceWriteOptions.HIVE_TABLE_OPT_KEY -> "tableName
",
DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY -> "creation_date",
DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY -> classOf[MultiPartKeysValueExtractor].getName
)
// Write the DataFrame as a Hudi dataset
(inputDF.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.INSERT_OPERATION_OPT_VAL)
.options(hudiOptions)
.mode(SaveMode.Overwrite)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
# Create a DataFrame
inputDF = spark.createDataFrame(
[
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z"),
("104", "2015-01-02", "2015-01-01T12:15:00.512679Z"),
("105", "2015-01-02", "2015-01-01T13:51:42.248818Z"),
],
["id", "creation_date", "last_update_time"]
)
# Specify common DataSourceWriteOptions in the single hudiOptions variable
hudiOptions = {
'hoodie.table.name': 'tableName
',
'hoodie.datasource.write.recordkey.field': 'id',
'hoodie.datasource.write.partitionpath.field': 'creation_date',
'hoodie.datasource.write.precombine.field': 'last_update_time',
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.table': 'tableName
',
'hoodie.datasource.hive_sync.partition_fields': 'creation_date',
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor'
}
# Write a DataFrame as a Hudi dataset
inputDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'insert') \
.options(**hudiOptions) \
.mode('overwrite') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/
')
nota
Você pode ver “hoodie” em vez de Hudi em exemplos de código e notificações. A base de código do Hudi usa amplamente a antiga grafia “hoodie”.
Opção | Descrição |
---|---|
TABLE_NAME |
O nome da tabela com o qual registrar o conjunto de dados. |
TABLE_TYPE_OPT_KEY |
Opcional. Especifica se o conjunto de dados foi criado como |
RECORDKEY_FIELD_OPT_KEY |
O campo de chave de registro cujo valor será usado como o componente |
PARTITIONPATH_FIELD_OPT_KEY |
O campo de caminho de partição cujo valor será usado como o componente |
PRECOMBINE_FIELD_OPT_KEY |
O campo usado na pré-combinação antes da gravação real. Quando dois registros têm o mesmo valor de chave, o Hudi seleciona aquele com o maior valor para o campo de pré-combinação, conforme determinado por |
As opções a seguir são necessárias apenas para registrar a tabela do conjunto de dados do Hudi no seu metastore. Se você não registrar o conjunto de dados do Hudi como uma tabela no metastore do Hive, essas opções não serão necessárias.
Opção | Descrição |
---|---|
HIVE_DATABASE_OPT_KEY |
O banco de dados do Hive com o qual sincronizar. O padrão é |
HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY |
A classe usada para extrair valores de campo de partição em colunas de partição do Hive. |
HIVE_PARTITION_FIELDS_OPT_KEY |
O campo no conjunto de dados a ser usado para determinar colunas de partição do Hive. |
HIVE_SYNC_ENABLED_OPT_KEY |
Quando definido como |
HIVE_TABLE_OPT_KEY |
Obrigatório. O nome da tabela no Hive com a qual sincronizar. Por exemplo, |
HIVE_USER_OPT_KEY |
Opcional. O nome de usuário do Hive a ser usado ao sincronizar. Por exemplo, |
HIVE_PASS_OPT_KEY |
Opcional. A senha do Hive para o usuário especificado por |
HIVE_URL_OPT_KEY |
O URL do metastore do Hive. |
Upsert dados
O exemplo a seguir demonstra como alterar dados escrevendo um. DataFrame Ao contrário do exemplo anterior de inserção, o valor OPERATION_OPT_KEY
é definido como UPSERT_OPERATION_OPT_VAL
. Além disso, .mode(SaveMode.Append)
é especificado para indicar que o registro deve ser anexado.
nota
O Amazon EMR 6.7.0 usa o Apache Hudi
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value"))
(updateDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, "upsert")
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
// Create a new DataFrame from the first row of inputDF with a different creation_date value
val updateDF = inputDF.limit(1).withColumn("creation_date", lit("new_value
"))
(updateDF.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.options(hudiOptions)
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
from pyspark.sql.functions import lit
# Create a new DataFrame from the first row of inputDF with a different creation_date value
updateDF = inputDF.limit(1).withColumn('creation_date', lit('new_value
'))
updateDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Excluir um registro
Para excluir um registro de forma irreversível, você pode upsert uma carga útil vazia. Nesse caso, a opção PAYLOAD_CLASS_OPT_KEY
especifica a classe EmptyHoodieRecordPayload
. O exemplo usa o mesmo DataFrame,updateDF
, usado no exemplo upsert para especificar o mesmo registro.
nota
O Amazon EMR 6.7.0 usa o Apache Hudi
(updateDF.write
.format("hudi")
.options(hudiOptions)
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, "delete")
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
(updateDF.write
.format("org.apache.hudi")
.option(DataSourceWriteOptions.OPERATION_OPT_KEY, DataSourceWriteOptions.UPSERT_OPERATION_OPT_VAL)
.option(DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY, "org.apache.hudi.common.model.EmptyHoodieRecordPayload")
.mode(SaveMode.Append)
.save("s3://amzn-s3-demo-bucket/myhudidataset/
"))
updateDF.write \
.format('org.apache.hudi') \
.option('hoodie.datasource.write.operation', 'upsert') \
.option('hoodie.datasource.write.payload.class', 'org.apache.hudi.common.model.EmptyHoodieRecordPayload') \
.options(**hudiOptions) \
.mode('append') \
.save('s3://amzn-s3-demo-bucket/myhudidataset/
')
Você também pode excluir dados de forma irreversível definindo OPERATION_OPT_KEY
como DELETE_OPERATION_OPT_VAL
para remover todos os registros no conjunto de dados enviado. Para obter instruções sobre como realizar exclusões reversíveis e obter mais informações sobre a exclusão de dados armazenados em tabelas do Hudi, consulte Exclusões
Ler em um conjunto de dados do Hudi
Para recuperar dados no momento atual, o Hudi realiza consultas de snapshots por padrão. Veja a seguir um exemplo de consulta do conjunto de dados gravado no S3 em Gravar em um conjunto de dados do Hudi. s3://amzn-s3-demo-bucket/myhudidataset
Substitua pelo caminho da tabela e adicione asteriscos curinga para cada nível de partição, além de um asterisco adicional. Neste exemplo, há um nível de partição, portanto adicionamos dois símbolos curinga.
nota
O Amazon EMR 6.7.0 usa o Apache Hudi
val snapshotQueryDF = spark.read
.format("hudi")
.load("s3://amzn-s3-demo-bucket/myhudidataset"
)
.show()
(val snapshotQueryDF = spark.read
.format("org.apache.hudi")
.load("s3://amzn-s3-demo-bucket/myhudidataset
" + "/*/*"))
snapshotQueryDF.show()
snapshotQueryDF = spark.read \
.format('org.apache.hudi') \
.load('s3://amzn-s3-demo-bucket/myhudidataset
' + '/*/*')
snapshotQueryDF.show()
Consultas incrementais
Você também pode realizar consultas incrementais com o Hudi para obter um fluxo de registros que foram alterados desde um determinado carimbo de data/hora de confirmação. Para fazer isso, defina o campo QUERY_TYPE_OPT_KEY
como QUERY_TYPE_INCREMENTAL_OPT_VAL
. Em seguida, adicione um valor para BEGIN_INSTANTTIME_OPT_KEY
para obter todos os registros gravados desde a hora especificada. Normalmente, as consultas incrementais são dez vezes mais eficientes do que as de lote, pois processam somente registros alterados.
Ao realizar consultas incrementais, use o caminho da tabela raiz (básica) sem os asteriscos curinga usados nas consultas Snapshot.
nota
O Presto não é compatível com consultas incrementais.
(val incQueryDF = spark.read
.format("org.apache.hudi")
.option(DataSourceReadOptions.QUERY_TYPE_OPT_KEY, DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL)
.option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, <beginInstantTime>
)
.load("s3://amzn-s3-demo-bucket/myhudidataset
" ))
incQueryDF.show()
readOptions = {
'hoodie.datasource.query.type': 'incremental',
'hoodie.datasource.read.begin.instanttime': <beginInstantTime>
,
}
incQueryDF = spark.read \
.format('org.apache.hudi') \
.options(**readOptions) \
.load('s3://amzn-s3-demo-bucket/myhudidataset
')
incQueryDF.show()
Para obter mais informações sobre a leitura de conjuntos de dados do Hudi, consulte Consultar tabelas do Hudi