GlueContext 類 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

GlueContext 類

包裝 Apache 的星火SparkContext對象,從而提供了與 Apache 星火平台進行交互的機制。

__init__

__init__(sparkContext)
  • sparkContext – 欲使用的 Apache Spark 細節。

正在建立

getSource

getSource(connection_type, transformation_ctx = "", **options)

建立可用於從外部來源讀取 DynamicFramesDataSource 物件。

  • connection_type – 要使用的連線類型,例如 Amazon Simple Storage Service (Amazon S3)、Amazon Redshift 及 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracledynamodb

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • options – 選擇性的名稱/值對的集合。如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

以下是 getSource 的使用範例:

>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()

create_dynamic_frame_from_rdd

create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")

傳回從 Apache Spark 彈性分散式資料集 (RDD) 建立的 DynamicFrame

  • data – 欲使用的資料來源。

  • name – 欲使用的資料名稱。

  • schema – 欲使用的結構描述 (選用)。

  • sample_ratio – 欲使用的取樣率 (選用)。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

create_dynamic_frame_from_catalog

create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)

傳回使用 Data Catalog 資料庫和資料表名稱建立的 DynamicFrame。使用此方法時,您可以透format_optionsadditional_options引數在指定的 AWS Glue 資料目錄表格上提供透過資料表屬性,以及其他選項。

  • Database – 欲讀取的資料庫。

  • table_name – 欲讀取的資料表的名稱。

  • redshift_tmp_dir – 所要使用的 Amazon Redshift 暫時目錄 (選用)。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • push_down_predicate – 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需支援的來源和限制,請參閱 AWS Glue ETL 中的透過下推將讀取最佳化。如需詳細資訊,請參閱 使用 pushdown 述詞預先篩選

  • additional_options – 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出的項目,除了 endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassification 以及delimiter。另一個支援的選項是 catalogPartitionPredicate

    catalogPartitionPredicate — 您可以傳遞目錄表達式以根據索引欄進行篩選。這會將篩選下推至伺服器端。如需詳細資訊,請參閱 AWS Glue 分割區索引。注意 push_down_predicatecatalogPartitionPredicate 使用不同的語法。前者使用 Spark SQL 標準語法,後者使用 JSQL 剖析器。

  • catalog_id — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。

create_dynamic_frame_from_options

create_dynamic_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

傳回使用指定的連線和格式建立的 DynamicFrame

  • connection_type – 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracledynamodb

  • connection_options - 連線選項,例如路徑和資料庫資料表 (選用)。如果是 connection_types3,會定義 Amazon S3 路徑清單。

    connection_options = {"paths": ["s3://aws-glue-target/temp"]}

    如果是 JDBC 連線,必須定義幾項屬性。請注意,資料庫名稱必須是 URL 的一部分。它可以選擇性包含在連線選項中。

    警告

    不建議在指令碼中存放密碼。考慮使boto3用從 AWS Secrets Manager 或 AWS Glue 資料型錄擷取它們。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable 屬性為 JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定 schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。

    如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

  • format— 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • format_options – 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • push_down_predicate – 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需支援的來源和限制,請參閱 AWS Glue ETL 中的透過下推將讀取最佳化。如需詳細資訊,請參閱使用 Pushdown 述詞預先篩選

create_sample_dynamic_frame_from_catalog

create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)

傳回使用 Data Catalog 資料庫和資料表名稱建立的範例 DynamicFrameDynamicFrame 僅包含來自資料來源的第一個 num 記錄。

  • database – 欲讀取的資料庫。

  • table_name – 欲讀取的資料表的名稱。

  • num – 傳回的範例動態框架中記錄的最大數目。

  • redshift_tmp_dir:所要使用的 Amazon Redshift 臨時目錄 (選用)。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • push_down_predicate – 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需詳細資訊,請參閱 使用 pushdown 述詞預先篩選

  • additional_options – 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出的項目,除了 endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassification 以及delimiter

  • sample_options – 用於控制取樣行為的參數 (選用)。Amazon S3 來源的目前可用參數:

    • maxSamplePartitions – 取樣將讀取的分割區數目上限。預設值為 10

    • maxSampleFilesPerPartition – 取樣將在一個分割區中讀取的檔案數目上限。預設值為 10。

      這些參數有助於減少檔案清單所耗用的時間。例如,假設資料集有 1000 個分割區,並且每個分割區都有 10 個檔案。如果您設定 maxSamplePartitions = 10 和 maxSampleFilesPerPartition = 10,而不是列出所有 10,000 個檔案,而是僅列出和讀取前 10 個分割區及每個分割區的前 10 個檔案 (總計為 10*10 = 100 個檔案)。

  • catalog_id – 要存取之 Data Catalog 的目錄 ID ( Data Catalog 的帳戶 ID)。依預設設定為 NoneNone 預設為服務中呼叫帳戶的目錄 ID。

create_sample_dynamic_frame_from_options

create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")

傳回使用指定的連線和格式建立的範例 DynamicFrameDynamicFrame 僅包含來自資料來源的第一個 num 記錄。

  • connection_type – 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracledynamodb

  • connection_options - 連線選項,例如路徑和資料庫資料表 (選用)。如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

  • num – 傳回的範例動態框架中記錄的最大數目。

  • sample_options – 用於控制取樣行為的參數 (選用)。Amazon S3 來源的目前可用參數:

    • maxSamplePartitions – 取樣將讀取的分割區數目上限。預設值為 10

    • maxSampleFilesPerPartition – 取樣將在一個分割區中讀取的檔案數目上限。預設值為 10。

      這些參數有助於減少檔案清單所耗用的時間。例如,假設資料集有 1000 個分割區,並且每個分割區都有 10 個檔案。如果您設定 maxSamplePartitions = 10 和 maxSampleFilesPerPartition = 10,而不是列出所有 10,000 個檔案,而是僅列出和讀取前 10 個分割區及每個分割區的前 10 個檔案 (總計為 10*10 = 100 個檔案)。

  • format— 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • format_options – 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • push_down_predicate – 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需詳細資訊,請參閱 使用 pushdown 述詞預先篩選

add_ingestion_time_columns

add_ingestion_time_columns(dataFrame, timeGranularity = "")

附加擷取時間欄 (如 ingest_yearingest_monthingest_dayingest_houringest_minute) 到輸入 DataFrame。當您指定以 Amazon S3 為目標的 Data Catalog 資料表時,此函數會在 AWS Glue 產生的指令碼中自動產生。此函數會自動使用輸出資料表上的擷取時間欄來更新分割區。這可讓輸出資料在擷取時間自動分割,而不需要輸入資料中的明確擷取時間欄。

  • dataFrame – 要將擷取時間欄附加到的 dataFrame

  • timeGranularity – 時間欄的精密程度。有效值為 "day"、"hour" 和 "minute"。例如:如果 "hour" 被傳遞給函數,原始 dataFrame 會附加上 "ingest_year"、"ingest_month"、"ingest_day" 和 "ingest_hour" 時間欄。

傳回附加時間粒度欄後的資料框架。

範例:

dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))

create_data_frame_from_catalog

create_data_frame_from_catalog(database, table_name, transformation_ctx = "", additional_options = {})

傳回使用 Data Catalog 資料表的資訊建立的 DataFrame

  • database – 要從中讀取的 Data Catalog 資料庫。

  • table_name – 要從中讀取的 Data Catalog 資料表的名稱。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • additional_options – 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出用於串流來源的項目,例如 startingPositionmaxFetchTimeInMs 以及 startingOffsets

    • useSparkDataSource— 當設定為 true 時,強制 AWS Glue 使用原生 Spark 資料來源 API 來讀取資料表。Spark Data Source API 支援下列格式:AVRO、二進位、CSV、JSON、ORC、Parquet 和文字。在 Data Catalog 資料表中,您可以使用 classification 屬性指定格式。若要進一步了解 Spark Data Source API,請參閱官方 Apache Spark 文件

      create_data_frame_from_cataloguseSparkDataSource 一起使用具有以下好處:

      • 直接傳回 DataFrame 並提供 create_dynamic_frame.from_catalog().toDF() 的替代方案。

      • 支援原生格式的 AWS Lake Formation 表格層級權限控制。

      • 支援讀取資料湖格式,無需 AWS Lake Formation 表格層級權限控制。如需詳細資訊,請參閱 將資料湖架構與 AWS Glue ETL 任務搭配使用

      啟用時useSparkDataSource,您也可以視需要在additional_options中新增任何 Spark 資料來源選項。 AWS Glue 將這些選項直接傳遞給 Spark 閱讀器。

    • useCatalogSchema— 設定為 true 時, AWS Glue 會將資料目錄結構描述套用至產生的結果DataFrame。否則,讀取器會從資料推斷結構描述。啟用 useCatalogSchema 時,也必須將 useSparkDataSource 設定為 true。

限制

使用 useSparkDataSource 選項時請考慮以下限制:

  • 當您使用時useSparkDataSource, AWS Glue 會DataFrame在不同於原始 Spark 工作階段的個別 Spark 工作階段中建立新的。

  • Spark DataFrame 分區過濾不適用於以下 AWS Glue 功能。

    若要搭配這些功能使用分割區篩選,您可以使用 AWS Glue 下拉述詞。如需詳細資訊,請參閱 使用 pushdown 述詞預先篩選。篩選未分割資料欄不會受到影響。

    下列範例指令碼示範使用 excludeStorageClasses 選項執行分割區篩選的不正確方法。

    // Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")

    下列範例指令碼示範使用 excludeStorageClasses 選項,利用下推述詞來執行分割區篩選的正確方法。

    // Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")

範例:使用 Spark Data Source 讀取器來建立 CSV 資料表

// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=<database_name>, table_name=<table_name>, additional_options = {"useSparkDataSource": True, "sep": '\t'} )

create_data_frame_from_options

create_data_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

此 API 現已棄用。請改用 getSource() API。傳回使用指定的連線和格式建立的 DataFrame。這個函數只能用於 AWS Glue 串流來源。

  • connection_type - 串流連線類型。有效值包括 kinesiskafka

  • connection_options— 連線選項,這些選項對於 Kinesis 和 Kafka 而言是不同的。您可以在 AWS Glue for Spark 中 ETL 的連線類型和選項 中找到每個串流資料來源的所有連線選項清單。請注意串流連線選項的下列不同處:

    • Kinesis 串流來源需要 streamARNstartingPositioninferSchema 以及 classification

    • Kafka 串流來源需要 connectionNametopicNamestartingOffsetsinferSchema 以及 classification

  • format— 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。如需有關支援格式的資訊,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項

  • format_options – 指定格式的格式選項。如需支援格式選項的詳細資訊,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項

  • transformation_ctx – 欲使用的轉換細節 (選用)。

Amazon Kinesis 串流來源範例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kafka 串流來源範例:

kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

forEachBatch

forEachBatch(frame, batch_function, options)

將傳入的 batch_function 套用至從串流來源讀取的每個微批次。

  • frame— 包 DataFrame 含當前微批次。

  • batch_function – 將套用至每個微批次的函數。

  • options – 索引鍵/值配對的集合,其中包含如何處理微批次的相關資訊。下列選項是必要的:

    • windowSize – 處理每個批次的時間量。

    • checkpointLocation - 串流 ETL 任務的檢查點儲存位置。

    • batchMaxRetries – 如果失敗,可重試批次的次數上限。預設值為 3。此選項僅在 Glue 2.0 及以上版本上才可設定。

範例:

glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )

在 Amazon S3 中使用資料集

purge_table

purge_table(catalog_id=None, database="", table_name="", options={}, transformation_ctx="")

從 Amazon S3 中刪除指定目錄資料庫和資料表的檔案。如果刪除分割區中的所有檔案,該分割區也會從目錄中刪除。

如果您希望能夠復原已刪除的物件,您可以在 Amazon S3 儲存貯體上開啟物件版本控制。從未啟用物件版本控制的儲存貯體中刪除物件時,無法復原物件。如需如何復原已啟用版本控制之儲存貯體中已刪除物件的詳細資訊,請參閱 AWS Support 知識中心的如何擷取已刪除的 Amazon S3 物件?

  • catalog_id – 要存取之 Data Catalog 的目錄 ID ( Data Catalog 的帳戶 ID)。依預設設定為 NoneNone 預設為服務中呼叫帳戶的目錄 ID。

  • database – 所要使用的資料庫。

  • table_name - 要使用的資料表名稱。

  • options - 篩選要刪除之檔案和用於產生資訊清單檔案的選項。

    • retentionPeriod - 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。

    • partitionPredicate - 滿足此述詞的分割區會被刪除。這些分割區中仍在保留期間內的檔案不會被刪除。設定為 "" – 預設為空值。

    • excludeStorageClasses - 不會刪除 excludeStorageClasses 集合中具有儲存體方案的檔案。預設為 Set() – 空集合。

    • manifestFilePath - 產生資訊清單檔案的選用路徑。所有已成功清除的檔案都會記錄在 Success.csv 中,失敗的則記錄在 Failed.csv

  • transformation_ctx – 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。

glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

purge_s3_path

purge_s3_path(s3_path, options={}, transformation_ctx="")

以遞迴方式刪除指定 Amazon S3 路徑中的檔案。

如果您希望能夠復原已刪除的物件,您可以在 Amazon S3 儲存貯體上開啟物件版本控制。從未開啟物件版本控制的儲存貯體中刪除物件時,無法復原物件。如需如何使用版本控制復原儲存貯體中已刪除物件的詳細資訊,請參閱如何擷取已刪除的 Amazon S3 物件? 在 AWS Support 知識中心。

  • s3_path - 要刪除之檔案的 Amazon S3 路徑,格式為 s3://<bucket>/<prefix>/

  • options - 篩選要刪除之檔案和用於產生資訊清單檔案的選項。

    • retentionPeriod - 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。

    • excludeStorageClasses - 不會刪除 excludeStorageClasses 集合中具有儲存體方案的檔案。預設為 Set() – 空集合。

    • manifestFilePath - 產生資訊清單檔案的選用路徑。所有已成功清除的檔案都會記錄在 Success.csv 中,失敗的則記錄在 Failed.csv

  • transformation_ctx – 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。

glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

transition_table

transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)

針對指定之目錄的資料庫和資料表,轉換儲存在 Amazon S3 上之檔案的儲存體方案。

您可以在任意兩個儲存體方案之間轉換。對於 GLACIERDEEP_ARCHIVE 儲存體方案,您可以轉換到這些方案。但是,您可以使用 S3 RESTOREGLACIERDEEP_ARCHIVE 儲存體方案轉換。

如果您執行的 AWS Glue ETL 任務會從 Amazon S3 讀取檔案或分割區,則您可排除部分 Amazon S3 儲存類別類型。如需詳細資訊,請參閱排除 Amazon S3 儲存體方案

  • database – 所要使用的資料庫。

  • table_name - 要使用的資料表名稱。

  • transition_to – 要轉移的 Amazon S3 儲存方案

  • options - 篩選要刪除之檔案和用於產生資訊清單檔案的選項。

    • retentionPeriod - 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。

    • partitionPredicate - 滿足此述詞的分割區會被轉換。這些分割區中仍在保留期間內的檔案不會被轉換。設定為 "" – 預設為空值。

    • excludeStorageClasses - 不會轉換 excludeStorageClasses 集合中具有儲存體方案的檔案。預設為 Set() – 空集合。

    • manifestFilePath - 產生資訊清單檔案的選用路徑。所有已成功轉換的檔案都會記錄在 Success.csv 中,失敗的則記錄在 Failed.csv

    • accountId – 要執行轉移轉換的 Amazon Web Services 帳戶 ID。對於這種轉換是強制性的。

    • roleArn— 執行轉換轉換的 AWS 角色。對於這種轉換是強制性的。

  • transformation_ctx – 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。

  • catalog_id – 要存取之 Data Catalog 的目錄 ID ( Data Catalog 的帳戶 ID)。依預設設定為 NoneNone 預設為服務中呼叫帳戶的目錄 ID。

glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

transition_s3_path

transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")

以遞迴方式轉換指定 Amazon S3 路徑中檔案的儲存體方案。

您可以在任意兩個儲存體方案之間轉換。對於 GLACIERDEEP_ARCHIVE 儲存體方案,您可以轉換到這些方案。但是,您可以使用 S3 RESTOREGLACIERDEEP_ARCHIVE 儲存體方案轉換。

如果您執行的 AWS Glue ETL 任務會從 Amazon S3 讀取檔案或分割區,則您可排除部分 Amazon S3 儲存類別類型。如需詳細資訊,請參閱排除 Amazon S3 儲存體方案

  • s3_path - 要以格式 s3://<bucket>/<prefix>/ 轉換之檔案的 Amazon S3 路徑。

  • transition_to – 要轉移的 Amazon S3 儲存方案

  • options - 篩選要刪除之檔案和用於產生資訊清單檔案的選項。

    • retentionPeriod - 指定保留檔案的期間 (以小時為單位)。比保留期間新的檔案都會予以保留。依預設設定為 168 小時 (7 天)。

    • partitionPredicate - 滿足此述詞的分割區會被轉換。這些分割區中仍在保留期間內的檔案不會被轉換。設定為 "" – 預設為空值。

    • excludeStorageClasses - 不會轉換 excludeStorageClasses 集合中具有儲存體方案的檔案。預設為 Set() – 空集合。

    • manifestFilePath - 產生資訊清單檔案的選用路徑。所有已成功轉換的檔案都會記錄在 Success.csv 中,失敗的則記錄在 Failed.csv

    • accountId – 要執行轉移轉換的 Amazon Web Services 帳戶 ID。對於這種轉換是強制性的。

    • roleArn— 執行轉換轉換的 AWS 角色。對於這種轉換是強制性的。

  • transformation_ctx – 欲使用的轉換細節 (選用)。用於資訊清單檔案的路徑。

glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

擷取

extract_jdbc_conf

extract_jdbc_conf(connection_name, catalog_id = None)

從 Data Catalog 中的 AWS Glue 連線物件傳回含索引鍵 (具有組態屬性) 的 dict

  • user:資料庫使用者名稱。

  • password:資料庫密碼。

  • vendor:指定廠商 (mysqlpostgresqloraclesqlserver 等)。

  • enforceSSL:布林字串,指示是否需要安全連線。

  • customJDBCCert:使用指定 Amazon S3 路徑中的特定用戶端憑證。

  • skipCustomJDBCCertValidation:布林字串,指示 customJDBCCert 必須由 CA 驗證。

  • customJDBCCertString:有關自訂憑證的其他資訊,因驅動程式類型而異。

  • url:(已棄用) 僅包含通訊協定、伺服器和連接埠的 JDBC URL。

  • fullUrl:建立連線時輸入的 JDBC URL (適用於 AWS Glue 3.0 版或更新版本)。

擷取 JDBC 組態的範例:

jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}

交易

start_transaction

start_transaction(read_only)

開始新交易。內部呼叫 Lake Formation startTransaction API。

  • read_only – (布林值) 指出此交易應該是唯讀,還是讀取和寫入。使用唯讀交易 ID 進行的寫入將被拒絕。唯讀交易不需要遞交。

傳回交易 ID。

commit_transaction

commit_transaction(transaction_id, wait_for_commit = True)

嘗試遞交指定的交易。commit_transaction 可能會在交易完成遞交之前返回。內部呼叫 Lake Formation commitTransaction API。

  • transaction_id – (字串) 要遞交的交易。

  • wait_for_commit – (布林值) 決定 commit_transaction 是否立即傳回。預設值為 true。如為 False,commit_transaction 輪詢並等待,直到交易完成遞交。使用指數退避時,等待時間長度限制為 1 分鐘,最多可嘗試 6 次重試。

傳回一個布林值,指示遞交是否完成。

cancel_transaction

cancel_transaction(transaction_id)

嘗試取消指定的交易。如果交易先前已遞交,傳回 TransactionCommittedException 例外狀況。內部調用 Lake Formation CancelTransactionAPI。

  • transaction_id – (字串) 要取消的交易。

寫入

getSink

getSink(connection_type, format = None, transformation_ctx = "", **options)

取得可用於將 DynamicFrames 寫入外部來源的 DataSink 物件。請先檢查 SparkSQL format 以確保取得預期的目的地。

  • connection_type – 要使用的連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括s3mysqlpostgresql、、redshiftsqlserveroraclekinesis、和kafka

  • format – 要使用的 SparkSQL 格式 (選用)。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • options – 名稱/值對的集合,用來指定連線選項。一些可能的值為:

    • userpassword:適用於授權

    • url:資料存放區的端點

    • dbtable:目標資料表的名稱。

    • bulkSize:插入操作的平行程度

您可以指定的選項取決於連線類型。如需其他值和範例,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

範例:

>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)

write_dynamic_frame_from_options

write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

使用指定的連線和格式來撰寫並傳回 DynamicFrame

  • frame – 所要撰寫的 DynamicFrame

  • connection_type – 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括s3mysqlpostgresql、、redshiftsqlserveroraclekinesis、和kafka

  • connection_options – 連線選項,例如路徑和資料庫資料表 (選用)。如果是 connection_types3,會定義 Amazon S3 路徑。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    如果是 JDBC 連線,必須定義幾項屬性。請注意,資料庫名稱必須是 URL 的一部分。它可以選擇性包含在連線選項中。

    警告

    不建議在指令碼中存放密碼。考慮使boto3用從 AWS Secrets Manager 或 AWS Glue 資料型錄擷取它們。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable 屬性為 JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定 schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。

    如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

  • format— 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • format_options – 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • transformation_ctx – 所要使用的轉換細節 (選用)。

write_from_options

write_from_options(frame_or_dfc, connection_type, connection_options={}, format={}, format_options={}, transformation_ctx = "")

寫入和傳回以指定的連線和格式資訊建立的 DynamicFrameDynamicFrameCollection

  • frame_or_dfc – 所要撰寫的 DynamicFrameDynamicFrameCollection

  • connection_type – 連線類型,例如 Amazon S3、Amazon Redshift 及 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracle

  • connection_options – 連線選項,例如路徑和資料庫資料表 (選用)。如果是 connection_types3,會定義 Amazon S3 路徑。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    如果是 JDBC 連線,必須定義幾項屬性。請注意,資料庫名稱必須是 URL 的一部分。它可以選擇性包含在連線選項中。

    警告

    不建議在指令碼中存放密碼。考慮使boto3用從 AWS Secrets Manager 或 AWS Glue 資料型錄擷取它們。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable 屬性為 JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定 schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。

    如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

  • format— 格式規格。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • format_options – 指定格式的格式選項。請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項 以了解受支援的格式。

  • transformation_ctx – 所要使用的轉換細節 (選用)。

write_dynamic_frame_from_catalog

write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

使用來自 Data Catalog 資料庫和資料表的資訊寫入並傳回 DynamicFrame

  • frame – 所要撰寫的 DynamicFrame

  • Database – 包含資料表的 Data Catalog 資料庫。

  • table_name – 與目標關聯的 Data Catalog 資料表名稱。

  • redshift_tmp_dir – 所要使用的 Amazon Redshift 暫時目錄 (選用)。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • additional_options – 選擇性的名稱/值對的集合。

  • catalog_id — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。

write_data_frame_from_catalog

write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

使用來自 Data Catalog 資料庫和資料表的資訊寫入並傳回 DataFrame。此方法支援寫入資料湖格式 (Hudi、Iceberg 和 Delta Lake)。如需詳細資訊,請參閱 將資料湖架構與 AWS Glue ETL 任務搭配使用

  • frame – 所要撰寫的 DataFrame

  • Database – 包含資料表的 Data Catalog 資料庫。

  • table_name – 與目標關聯的 Data Catalog 資料表名稱。

  • redshift_tmp_dir:所要使用的 Amazon Redshift 臨時目錄 (選用)。

  • transformation_ctx – 欲使用的轉換細節 (選用)。

  • additional_options – 選擇性的名稱/值對的集合。

    • useSparkDataSink— 當設定為 true 時,強制 AWS Glue 使用原生 Spark 資料接收器 API 寫入資料表。當您啟用此選項時,您可以視需要將任何 Spark 資料來源選項新增至additional_options。 AWS Glue 將這些選項直接傳遞給 Spark 作家。

  • catalog_id – 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。如果您未指定值,則會使用發起人的預設帳戶 ID。

限制

使用 useSparkDataSink 選項時請考慮以下限制:

範例:使用 Spark Data Source 寫入器寫入 Hudi 資料表

hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name': <table_name>, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': <table_name>, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': <database_name>, 'hoodie.datasource.hive_sync.table': <table_name>, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame = <df_product_inserts>, database = <database_name>, table_name = <table_name>, additional_options = hudi_options )

write_dynamic_frame_from_jdbc_conf

write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

使用指定的 JDBC 連線資訊撰寫並傳回 DynamicFrame

  • frame – 所要撰寫的 DynamicFrame

  • catalog_connection – 所要使用的目錄連線。

  • connection_options – 連線選項,例如路徑和資料庫資料表 (選用)。如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

  • redshift_tmp_dir – 所要使用的 Amazon Redshift 暫時目錄 (選用)。

  • transformation_ctx – 所要使用的轉換細節 (選用)。

  • catalog_id — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。

write_from_jdbc_conf

write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

使用指定的 JDBC 連線資訊撰寫並傳回 DynamicFrameDynamicFrameCollection

  • frame_or_dfc – 所要撰寫的 DynamicFrameDynamicFrameCollection

  • catalog_connection – 所要使用的目錄連線。

  • connection_options – 連線選項,例如路徑和資料庫資料表 (選用)。如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

  • redshift_tmp_dir – 所要使用的 Amazon Redshift 暫時目錄 (選用)。

  • transformation_ctx – 所要使用的轉換細節 (選用)。

  • catalog_id — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。若無,會使用發起人的預設帳戶 ID。