使用 AWS Glue ETL 任务在 Data Catalog 中更新架构并添加新分区 - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 AWS Glue ETL 任务在 Data Catalog 中更新架构并添加新分区

您的提取、转换和加载 (ETL) 作业可能会在目标数据存储中创建新的表分区。随着时间推移,您的数据集架构可能演变和偏离 AWS Glue 数据目录架构。AWS GlueETL 任务现在提供了几个功能,您可以在 ETL 脚本中使用这些功能在数据目录中更新架构和分区。这些功能允许您在数据目录中查看 ETL 工作的结果,而无需重新运行爬网程序。

新分区

如果要在 AWS Glue Data Catalog 中查看新分区,可以执行以下操作之一:

  • 在作业完成后,重新运行爬网程序,并在爬网程序完成后,在控制台上查看新分区。

  • 一旦作业完成,即可在控制台上查看新分区,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码使用 enableUpdateCatalog 参数指示数据目录在任务运行期间将随着新分区的创建而更新。

方法 1:

在选项参数中传递 enableUpdateCatalogpartitionKeys

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
方法 2:

getSink() 中传递 enableUpdateCatalogpartitionKeys,并对 DataSink 对象调用 setCatalogInfo()

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

现在,您可以使用 AWS Glue ETL 任务本身创建新的目录表、使用修改的架构更新现有表,并在数据目录中添加新的表分区,而无需重新运行爬网程序。

更新表架构

如果要覆盖数据目录表的架构,可以执行以下操作之一:

  • 作业完成后,重新运行爬网程序,并确保您的爬网程序也已配置为更新表定义。当爬网程序完成时,在查控制台上看新分区以及任何架构更新。有关更多信息,请参阅使用 API 配置爬网程序

  • 一旦作业完成,即可在控制台上查看修改的架构,而无需重新运行爬网程序。可以通过向 ETL 脚本添加几行代码来启用此功能,如以下示例中所示。代码将 enableUpdateCatalog 设置为 true,也将 updateBehavior 设置为 UPDATE_IN_DATABASE,这表示在任务运行期间将覆盖架构并在数据目录中添加新分区。

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

如果您希望防止表架构被覆盖,但仍希望添加新分区,也可以将该 updateBehavior 值设置为 LOGupdateBehavior 的默认值为 UPDATE_IN_DATABASE,因此,如果您没有明确定义它,则表架构将被覆盖。

如果 enableUpdateCatalog 未设置为 true,无论为 updateBehavior 选择哪个选项,ETL 任务都不会更新数据目录中的表。

创建新表

还可以使用相同的选项在数据目录中创建新表。您可以使用 setCatalogInfo 指定数据库和新表名。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

限制

请注意以下限制:

  • 仅支持 Amazon Simple Storage Service(Amazon S3)目标。

  • 受管辖的表不支持 enableUpdateCatalog 功能。

  • 仅支持以下格式:jsoncsvavroparquet

  • 要使用 parquet 分类创建或更新表,您必须使用 DynamicFrames 针对 AWS Glue 优化的 Parquet 写入器。这可以通过以下一种方法实现:

    • 如果您要使用 parquet 分类更新目录中的现有表,则必须先将表的 "useGlueParquetWriter" 表属性设置为 true,然后才能对其进行更新。您可以通过 AWS Glue API/SDK、控制台或 Athena DDL 语句设置此属性。

      AWS Glue 控制台中的目录表属性编辑字段。

      设置目录表属性后,您可以使用以下代码片段使用新数据更新目录表:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • 如果目录中尚不存在该表,则可以在脚本中使用 getSink() 方法与 connection_type="s3" 将表及其分区添加到目录中,同时将数据写入 Amazon S3。为您的工作流程提供适当的 partitionKeyscompression

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • glueparquet 格式值是启用 AWS Glue Parquet 写入器的传统方法。

  • updateBehavior 设置为 LOG 时,只有当 DynamicFrame 架构等效于或包含在数据目录表的架构中定义的列子集时,才会添加新分区。

  • 非分区表不支持架构更新(未使用“partitionKeys”选项)。

  • 在 ETL 脚本中传递的参数与数据目录表架构中的 partitionKey 之间,您的 partitionKeys 必须是等效的且顺序相同。

  • 此功能目前尚不支持更新/创建嵌套更新架构的表(例如,结构内的数组)。

有关更多信息,请参阅 Spark 脚本编程