Creación de tablas, actualización de esquema y agregado de nuevas particiones en el catálogo de datos desde trabajos de ETL de AWS Glue - AWS Glue

Creación de tablas, actualización de esquema y agregado de nuevas particiones en el catálogo de datos desde trabajos de ETL de AWS Glue

Un trabajo de extracción, transformación y carga (ETL) podría crear nuevas particiones de la tabla en el almacén de datos de destino. El esquema del conjunto de datos puede evolucionar y ser cada vez más diferente del esquema del AWS Glue Data Catalog. AWS Glue Los trabajos de ETL ahora cuentan con varias características que se pueden utilizar dentro del script de ETL para actualizar el esquema y las particiones del Data Catalog. Estas características permiten ver los resultados del trabajo de ETL en el Data Catalog sin necesidad de volver a ejecutar el rastreador.

Nuevas particiones

Si desea ver las nuevas particiones del AWS Glue Data Catalog, puede realizar una de las siguientes acciones:

  • Cuando el trabajo termine, vuelva a ejecutar el rastreador y, una vez que finalice, consulte las nuevas particiones en la consola.

  • Cuando el trabajo termine, podrá ver las nuevas particiones en la consola de inmediato sin necesidad de volver a ejecutar el rastreador. Puede habilitar esta característica agregando algunas líneas de código al script ETL, tal y como se muestra en los siguientes ejemplos. El código utiliza el argumento enableUpdateCatalog para indicar que el Data Catalog se va a actualizar durante la ejecución del trabajo a medida que se creen las nuevas particiones.

Método 1

Pase enableUpdateCatalog y partitionKeys en un argumento de opciones.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
Método 2

Pase enableUpdateCatalog y partitionKeys en getSink() y llame a setCatalogInfo() en el objeto DataSink.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

Ahora, puede crear nuevas tablas de catálogo, actualizar las tablas existentes con un esquema modificado y agregar nuevas particiones de tabla en el Data Catalog utilizando únicamente un trabajo de ETL de AWS Glue, sin necesidad de volver a ejecutar los rastreadores.

Actualización del esquema de la tabla

Si desea sobrescribir el esquema de la tabla del Data Catalog, puede realizar una de las siguientes acciones:

  • Cuando termine el trabajo, vuelva a ejecutar el rastreador y asegúrese de que esté configurado para actualizar también la definición de la tabla. Cuando el rastreador finalice, consulte las nuevas particiones en la consola junto con las actualizaciones del esquema. Para obtener más información, consulte Configuración de un rastreador con la API.

  • Cuando el trabajo termine, podrá ver de inmediato el esquema modificado en la consola sin necesidad de volver a ejecutar el rastreador. Puede habilitar esta característica agregando algunas líneas de código al script ETL, tal y como se muestra en los siguientes ejemplos. El código utiliza el valor enableUpdateCatalog establecido en verdadero y también el valor updateBehavior establecido en UPDATE_IN_DATABASE, lo que indica que el esquema debe sobrescribirse y deben agregarse nuevas particiones en el Data Catalog durante la ejecución del trabajo.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

También puede establecer el valor updateBehavior en LOG si no desea que el esquema de la tabla se sobrescriba pero sí quiere agregar las nuevas particiones. El valor predeterminado de updateBehavior es UPDATE_IN_DATABASE, por lo que, si no lo define explícitamente, se sobrescribirá el esquema de la tabla.

Si enableUpdateCatalog no se establece en verdadero, independientemente de la opción seleccionada en updateBehavior, el trabajo de ETL no actualizará la tabla del Data Catalog.

Creación de nuevas tablas

También puede utilizar las mismas opciones para crear una nueva tabla en el Data Catalog. Puede especificar la base de datos y el nuevo nombre de la tabla con setCatalogInfo.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Restricciones

Tome nota de las siguientes restricciones:

  • Solo se soportan los destinos de Amazon Simple Storage Service (Amazon S3).

  • Solo se admiten los siguientes formatos: json, csv, avro y parquet.

  • Para crear o actualiza tablas con la clasificación parquet, debe utilizar el escritor de parquet optimizado para DynamicFrames de AWS Glue. Esto puede lograrse de tres maneras:

    • Llame a write_dynamic_frame_from_catalog() y, a continuación, configure la propiedad de tabla de useGlueParquetWriter en verdadero en la tabla que está actualizando.

    • Llame a getSink() en su script con connection_type="s3", luego configure el formato a glueparquet.

    • Llame a getSink() en su script con connection_type="s3", luego configure su formato en parquet y transfiera una propiedad useGlueParquetWriter como verdadera en su format_options, esto es especialmente útil para crear nuevas tablas de parquet.

  • Si updateBehavior se configura en LOG, las nuevas particiones solo se agregarán si el esquema de DynamicFrame es igual al subconjunto de las columnas definidas en el esquema de la tabla del Data Catalog, o si contiene un subconjunto de este tipo.

  • Las claves de partición deben ser equivalentes y estar en el mismo orden en el parámetro que se transfirió en el script de ETL y en las claves de partición del esquema de la tabla del catálogo de datos.

  • Esta característica aún no admite la actualización/creación de tablas en las que se anidan los esquemas de actualización (por ejemplo, matrices dentro de estructuras).

Para obtener más información, consulte . Programación de scripts de ETL.