使用任務書籤 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用任務書籤

AWS Glue for Spark 使用任務書籤追蹤已處理的資料。如需任務書籤功能及其支援內容的摘要,請參閱 使用任務書籤追蹤處理的資料。使用書籤對 AWS Glue 任務進行程式設計時,您可以取得視覺化任務中無法使用的彈性。

  • 從 JDBC 讀取時,您可以在 AWS Glue 指令碼中指定要作為書籤索引鍵使用的資料欄。

  • 您可以選擇要將哪個 transformation_ctx 套用至每個方法呼叫。

job.init在腳本的開頭調用,並使用適當配置的參數job.commit在腳本的末尾調用。這兩個函數會初始化書籤服務並更新服務的狀態變更。如果沒有呼叫書籤,書籤將不會運作。

指定書籤索引鍵

對於 JDBC 工作流程,書籤會透過比較索引鍵欄位的值與書籤值,追蹤任務已讀取的資料列。對於 Amazon S3 工作流程而言,這不是必需的程序,也不適用。在不使用視覺化編輯器的情況下撰寫 AWS Glue 指令碼時,您可以指定要使用書籤追蹤的資料欄。您也可以指定多個資料欄。指定使用者定義的書籤索引鍵時,允許值序列中存在間隙。

警告

如果使用的是使用者定義書籤索引鍵,則均須嚴格單調增加或減少。為複合索引鍵選取其他欄位時,「次要版本」或「修訂編號」等概念的欄位不符合此條件,因為這些欄位的值會在整個資料集中重複使用。

您可採用以下方式來指定 jobBookmarkKeysjobBookmarkKeysSortOrder

  • create_dynamic_frame.from_catalog – 請使用 additional_options

  • create_dynamic_frame.from_options – 請使用 connection_options

轉換內容

許多AWS Glue PySpark 動態影格方法都包含名為的選用參數transformation_ctx,這是 ETL 運算子實體的唯一識別碼。transformation_ctx 參數用來識別指定運算子之任務書籤內的狀態資訊。具體而言,AWS Glue 使用 transformation_ctx 來編製書籤狀態之索引鍵的索引。

警告

transformation_ctx 作為在書籤狀態中搜索指令碼中特定來源的鍵。為了讓書籤正常運作,您應始終確保來源和相關聯 transformation_ctx 的一致性。變更來源屬性或重新命名 transformation_ctx 可能會使之前的書籤無效,且基於時間戳記的篩選條件可能無法產生正確的結果。

為了讓任務書籤正常運作,請啟用任務書籤參數,並設定 transformation_ctx 參數。如果您未傳入 transformation_ctx 參數,則不會針對方法中使用的動態框架或資料表啟用任務書籤。例如,如果您有一個讀取和加入兩個 Amazon S3 來源的 ETL 任務,您可以選擇將 transformation_ctx 參數僅傳入您想要啟用書籤的這些方法。如果您重設任務的任務書籤,則不論所使用的 transformation_ctx 為何,系統都會重設所有與任務建立關聯的轉換。

如需 DynamicFrameReader 類別的詳細資訊,請參閱DynamicFrameReader 類。如需副檔名的詳細資 PySpark 訊,請參閱AWS Glue PySpark 延伸模組參考

範例

以下是針對 Amazon S3 資料來源產生指令碼的範例。使用任務書籤所需的指令碼部分會以斜體顯示。如需這些元素的詳細資訊,請參閱 GlueContext 類 API 和 DynamicFrameWriter 類別 API。

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()

以下是針對 JDBC 來源產生指令碼的範例。來源資料表是 empno 欄為主要金鑰的員工資料表。雖然根據預設,如果有指定書籤金鑰,則任務會使用序列主要金鑰做為書籤金鑰,因為 empno 不一定是按序排列值之間可能有間隙不符合預設書籤金鑰的資格。因此,指令碼會明確地指定 empno 為書籤金鑰。程式碼的該部分會以斜體顯示。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()