Creación de tablas, actualización del esquema y agregado de particiones nuevas en el Data Catalog de trabajos de ETL de AWS Glue - AWS Glue

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Creación de tablas, actualización del esquema y agregado de particiones nuevas en el Data Catalog de trabajos de ETL de AWS Glue

Un trabajo de extracción, transformación y carga (ETL) podría crear nuevas particiones de la tabla en el almacén de datos de destino. El esquema del conjunto de datos puede evolucionar y ser cada vez más diferente del esquema del AWS Glue Data Catalog. AWS Glue Los trabajos de ETL ahora cuentan con varias características que se pueden utilizar dentro del script de ETL para actualizar el esquema y las particiones del Data Catalog. Estas características permiten ver los resultados del trabajo de ETL en el Data Catalog sin necesidad de volver a ejecutar el rastreador.

Nuevas particiones

Si desea ver las nuevas particiones del AWS Glue Data Catalog, puede realizar una de las siguientes acciones:

  • Cuando el trabajo termine, vuelva a ejecutar el rastreador y, una vez que finalice, consulte las nuevas particiones en la consola.

  • Cuando el trabajo termine, podrá ver las nuevas particiones en la consola de inmediato sin necesidad de volver a ejecutar el rastreador. Puede habilitar esta característica agregando algunas líneas de código al script ETL, tal y como se muestra en los siguientes ejemplos. El código utiliza el argumento enableUpdateCatalog para indicar que el Data Catalog se va a actualizar durante la ejecución del trabajo a medida que se creen las nuevas particiones.

Método 1

Pase enableUpdateCatalog y partitionKeys en un argumento de opciones.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
Método 2

Pase enableUpdateCatalog y partitionKeys en getSink() y llame a setCatalogInfo() en el objeto DataSink.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

Ahora, puede crear nuevas tablas de catálogo, actualizar las tablas existentes con un esquema modificado y agregar nuevas particiones de tabla en el Data Catalog utilizando únicamente un trabajo de ETL de AWS Glue, sin necesidad de volver a ejecutar los rastreadores.

Actualización del esquema de la tabla

Si desea sobrescribir el esquema de la tabla del Data Catalog, puede realizar una de las siguientes acciones:

  • Cuando termine el trabajo, vuelva a ejecutar el rastreador y asegúrese de que esté configurado para actualizar también la definición de la tabla. Cuando el rastreador finalice, consulte las nuevas particiones en la consola junto con las actualizaciones del esquema. Para obtener más información, consulte Configuración de un rastreador con la API.

  • Cuando el trabajo termine, podrá ver de inmediato el esquema modificado en la consola sin necesidad de volver a ejecutar el rastreador. Puede habilitar esta característica agregando algunas líneas de código al script ETL, tal y como se muestra en los siguientes ejemplos. El código utiliza el valor enableUpdateCatalog establecido en verdadero y también el valor updateBehavior establecido en UPDATE_IN_DATABASE, lo que indica que el esquema debe sobrescribirse y deben agregarse nuevas particiones en el Data Catalog durante la ejecución del trabajo.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

También puede establecer el valor updateBehavior en LOG si no desea que el esquema de la tabla se sobrescriba pero sí quiere agregar las nuevas particiones. El valor predeterminado de updateBehavior es UPDATE_IN_DATABASE, por lo que, si no lo define explícitamente, se sobrescribirá el esquema de la tabla.

Si enableUpdateCatalog no se establece en verdadero, independientemente de la opción seleccionada en updateBehavior, el trabajo de ETL no actualizará la tabla del Data Catalog.

Creación de nuevas tablas

También puede utilizar las mismas opciones para crear una nueva tabla en el Data Catalog. Puede especificar la base de datos y el nuevo nombre de la tabla con setCatalogInfo.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Restricciones

Tome nota de las siguientes restricciones:

  • Solo se soportan los destinos de Amazon Simple Storage Service (Amazon S3).

  • La característica enableUpdateCatalog no es compatible con las tablas gobernadas.

  • Solo se admiten los siguientes formatos: json, csv, avro y parquet.

  • Para crear o actualiza tablas con la clasificación parquet, debe utilizar el escritor de parquet optimizado para DynamicFrames de AWS Glue. Esto se puede lograr con uno de los siguientes:

    • Si va a actualizar una tabla existente en el catálogo con una clasificación parquet, la tabla debe tener la propiedad de tabla "useGlueParquetWriter" establecida en true antes de actualizarla. Puede configurar esta propiedad a través de las API/SDK de AWS Glue, la consola o una instrucción DDL de Athena.

      
                            Campo de edición de propiedades de la tabla de catálogo en la consola AWS Glue.

      Una vez establecida la propiedad de la tabla de catálogo, puede utilizar el siguiente fragmento de código para actualizar la tabla de catálogo con los nuevos datos:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • Si la tabla aún no existe en el catálogo, puede utilizar el método getSink() del script con connection_type="s3" para agregar la tabla y sus particiones al catálogo, además de escribir los datos en Amazon S3. Proporcione la partitionKeys adecuada y la compression para su flujo de trabajo.

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • El valor de formato glueparquet es un método heredado para habilitar el escritor de Parquet de AWS Glue.

  • Si updateBehavior se configura en LOG, las nuevas particiones solo se agregarán si el esquema de DynamicFrame es igual al subconjunto de las columnas definidas en el esquema de la tabla del Data Catalog, o si contiene un subconjunto de este tipo.

  • Las actualizaciones de esquema no son compatibles con las tablas sin particiones (no se utiliza la opción "PartitionKeys").

  • Las claves de partición deben ser equivalentes y estar en el mismo orden en el parámetro que se transfirió en el script de ETL y en las claves de partición del esquema de la tabla del catálogo de datos.

  • Esta característica aún no admite la actualización/creación de tablas en las que se anidan los esquemas de actualización (por ejemplo, matrices dentro de estructuras).

Para obtener más información, consulte Programación de scripts de Spark.