使用 AWS Glue ETL 工作更新結構描述,並在「資料目錄」中新增分割區 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 AWS Glue ETL 工作更新結構描述,並在「資料目錄」中新增分割區

您的擷取、轉換和載入 (ETL) 任務可能會在目標資料存放區中建立新的資料表分割區。您的資料集結構描述可能會隨著時間演進,而與 AWS Glue 資料目錄結構描述有所不同。AWS GlueETL 任務目前提供數個功能,您可以在 ETL 指令碼中使用這些功能來更新資料目錄中的 ETL 結構描述和分割區。這些功能可讓您在資料目錄中查看 ETL 運作的結果,無需重新執行爬蟲程式。

新分割區

如果您想要檢視中的新分割區 AWS Glue Data Catalog,您可以執行下列其中一個動作:

  • 任務完成後,重新執行爬蟲程式,並在爬蟲程式完成後在主控台上檢視新的分割區。

  • 任務完成後,立即在主控台上檢視新的分割區,而不重新執行爬蟲程式。您可以在 ETL 指令碼中加入幾行程式碼,以啟用此功能,如下列範例所示。程式碼會使用 enableUpdateCatalog 引數來指出在資料目錄建立新分割區時,要在任務執行期間更新。

方法 1

以選項引數傳遞 enableUpdateCatalogpartitionKeys

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
方法 2

getSink() 傳遞 enableUpdateCatalogpartitionKeys,並呼叫 DataSink 物件上的 setCatalogInfo()

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

現在,您可以建立新的目錄資料表、使用修改的結構描述更新現有資料表,並使用 AWS Glue ETL 任務在資料目錄新增新的資料表分割區,而無需重新執行爬蟲程式。

更新資料表結構描述

如果您想覆寫資料目錄資料表的結構描述,可以執行以下任一作業:

  • 任務完成後,重新執行爬蟲程式,並確定您的爬蟲程式已設定為更新資料表定義。當爬蟲程式完成時,檢視主控台上的新分割區以及任何結構描述更新。如需詳細資訊,請參閱使用 API 設定爬蟲程式

  • 任務完成後,立即在主控台上檢視修改的結構描述,無需重新執行爬蟲程式。您可以在 ETL 指令碼中加入幾行程式碼,以啟用此功能,如下列範例所示。程式碼使用設為 true 的 enableUpdateCatalog,並同時將 updateBehavior 設為 UPDATE_IN_DATABASE,表示在任務執行期間,在資料目錄中覆寫結構描述並新增新的分割區。

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

如果您想防止資料表結構描述遭到覆寫,但仍想要新增新的分割區,您也可以將 updateBehavior 值設為 LOGupdateBehavior 的預設值為 UPDATE_IN_DATABASE,因此,如果您未明確定該值,則會覆寫資料表結構描述。

如果 enableUpdateCatalog 未設為 true,無論針對 updateBehavior 選取哪個選項,ETL 任務都不會在資料目錄中更新資料表。

建立新的資料表

您也可以使用相同選項,在資料目錄中建立新資料表。您可以使用 setCatalogInfo 指定資料庫和新的資料表名稱。

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

限制

請注意下列限制:

  • 僅支援 Amazon Simple Storage Service (Amazon S3) 的目標。

  • 控管的資料表不支援 enableUpdateCatalog 功能。

  • 僅支援下列格式:jsoncsvavroparquet

  • 若要使用parquet分類建立或更新表格,您必須使用AWS Glue最佳化的實木複合地板寫入器 DynamicFrames。這可以透過以下方法之一來達成:

    • 如果要使用 parquet 分類來更新型錄中的現有資料表,則在更新之前,資料表的 "useGlueParquetWriter" 資料表屬性必須設定為 true。您可以透過 AWS Glue API/SDK、主控台或透過 Athena DDL 陳述式來設定此屬性。

      AWS Glue 主控台中的目錄表格屬性編輯欄位。

      設定型錄資料表屬性後,您便可以使用以下程式碼片段,以新資料更新型錄資料表:

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • 如果型錄中尚不存在資料表,您可以在指令碼中使用 getSink() 方法和 connection_type="s3" 將資料表及其分割區新增至型錄中,並將資料寫入 Amazon S3。為您的工作流程提供適當的 partitionKeyscompression

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • glueparquet格式值是啟用實 AWS Glue 木複合地板寫入器的傳統方法。

  • updateBehavior 設為 LOG 後,只有在 DynamicFrame 結構描述等同於或包含資料目錄資料表結構描述中定義之欄的子集時,才會新增新的分割區。

  • 未分割資料表不支援結構描述更新 (不使用 "partitionKeys" 選項)。

  • 在 ETL 指令碼中傳遞的參數和資料目錄資料表結構描述中的 partitionKeys 之間,您的 partitionKeys 必須等效,而且順序相同。

  • 此功能目前尚不支援更新/建立更新巢狀結構描述的資料表 (例如,結構內的陣列)。

如需更多詳細資訊,請參閱 Spark 指令碼程式設計