Atualizar esquemas e adicionar novas partições ao Catálogo de Dados em trabalhos do AWS Glue ETL - AWS Glue

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Atualizar esquemas e adicionar novas partições ao Catálogo de Dados em trabalhos do AWS Glue ETL

O trabalho de extração, transformação e carregamento (ETL) pode criar partições de tabelas no armazenamento de dados de destino. Os esquemas do conjunto de dados podem evoluir e divergir do esquema do esquema do AWS Glue Data Catalog ao longo do tempo. Os trabalhos de ETL do AWS Glue agora oferecem vários recursos que podem ser usados no script de ETL para atualizar os esquemas e as partições no Data Catalog. Esses recursos permitem visualizar os resultados do trabalho de ETL no Data Catalog sem precisar executar o crawler novamente.

Novas partições

Se quiser visualizar as novas partições no AWS Glue Data Catalog, é possível realizar um dos seguintes procedimentos:

  • Quando o trabalho terminar, execute novamente o crawler e exiba as novas partições no console quando o crawler terminar.

  • Quando o trabalho terminar, exiba as novas partições no console imediatamente, sem precisar executar novamente o crawler. Você pode habilitar esse recurso adicionando algumas linhas de código ao seu script de ETL, como exibido nos exemplos a seguir. O código usa o argumento enableUpdateCatalog para indicar que o Data Catalog deve ser atualizado durante a execução do trabalho à medida que as novas partições são criadas.

Método 1

Passe enableUpdateCatalog e partitionKeys em um argumento de opções.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
Método 2

Passe enableUpdateCatalog e partitionKeys em getSink() e chame o objeto setCatalogInfo() no DataSink.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

Agora, é possível criar novas tabelas de catálogo, atualizar tabelas existentes com esquema modificado e adicionar novas partições de tabelas no Data Catalog usando um trabalho de ETL do AWS Glue, sem a necessidade de executar crawlers novamente.

Atualizar esquema da tabela

Se você quiser substituir o esquema da tabela do Data Catalog, é possível realizar um dos seguintes procedimentos:

  • Quando o trabalho for concluído, execute o crawler novamente e verifique se está configurado para atualizar a definição da tabela também. Visualize as partições no console junto com atualizações do esquema, quando o crawler finalizar. Para obter mais informações, consulte Configurar um crawler com a API.

  • Quando o trabalho for concluído, visualize o esquema modificado no console imediatamente, sem precisar executar o crawler novamente. Você pode habilitar esse recurso adicionando algumas linhas de código ao seu script de ETL, como exibido nos exemplos a seguir. O código usa enableUpdateCatalog definido como true e updateBehavior definido como UPDATE_IN_DATABASE, o que indica substituir o esquema e adicionar novas partições no Data Catalog durante a execução do trabalho.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

Também é possível definir o valor de updateBehavior como LOG se quiser impedir que o esquema da tabela seja substituído, mas ainda quiser adicionar novas partições. O valor padrão de updateBehavior é UPDATE_IN_DATABASE, portanto, se você não defini-lo explicitamente, o esquema da tabela será substituído.

Se enableUpdateCatalog não estiver definido como true, independentemente da opção selecionada para updateBehavior, o trabalho de ETL não atualizará a tabela no Data Catalog.

Criar novas tabelas

Também é possível usar as mesmas opções para criar uma tabela no Data Catalog. Também é possível especificar o banco de dados e o nome da nova tabela usando setCatalogInfo.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Restrições

Observe as seguintes restrições:

  • Somente os destinos do Amazon Simple Storage Service (Amazon S3) são suportados.

  • O atributo enableUpdateCatalog é suportado para tabelas governadas.

  • Somente os seguintes formatos são compatíveis: json, csv, avro e parquet.

  • Para criar ou atualizar tabelas com a classificação parquet, você deve utilizar o gravador de parquet do AWS Glue otimizado para DynamicFrames. Isso pode ser feito de uma das seguintes maneiras:

    • se você estiver atualizando uma tabela existente no catálogo com classificação parquet, a tabela deve ter a propriedade de tabela "useGlueParquetWriter" definida como true antes de você atualizá-la. Você pode definir essa propriedade por meio de APIs/SDK do AWS Glue, por meio do console ou por meio de uma instrução DDL do Athena.

      Campo de edição de propriedade da tabela de catálogo no console do AWS Glue.

      Depois que a propriedade da tabela do catálogo estiver definida, você poderá usar o seguinte trecho de código para atualizar a tabela do catálogo com os novos dados:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • Se a tabela ainda não existir no catálogo, você pode utilizar o método getSink() em seu script com connection_type="s3" para adicionar a tabela e suas partições ao catálogo, além de gravar os dados no Amazon S3. Forneça as partitionKeys e a compression para seu fluxo de trabalho.

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • O valor de formato glueparquet é um método herdado para habilitar o gravador parquet AWS Glue.

  • Quando updateBehavior for definido como LOG, as novas partições serão adicionadas somente se o esquema DynamicFrame for equivalente a, ou contiver, um subconjunto das colunas definidas no esquema da tabela do Data Catalog.

  • As atualizações de esquema não são aceitas para tabelas não particionadas (que não usam a opção “partitionKeys”).

  • As partitionKeys devem ser equivalentes e estar na mesma ordem, entre o parâmetro informado no script de ETL e as partitionKeys no esquema da tabela do Data Catalog.

  • Atualmente, esse recurso ainda não suporta atualização/criação de tabelas nas quais os esquemas de atualização são aninhados (por exemplo, matrizes dentro de estruturas).

Para ter mais informações, consulte Programar scripts do Spark.