本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Kafka 連線
指定連接到 Kafka 叢集或 Amazon Managed Streaming for Apache Kafka 叢集的連線。
您可以使用存儲在「數據目錄」表中的信息或提供信息直接訪問數據流來讀取和寫入卡夫卡數據流。您可以從卡夫卡讀取信息到火花 DataFrame,然後將其轉換為 Glue。 AWS DynamicFrame您可以以 JSON 格式寫信給 DynamicFrames 卡夫卡。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。
如果您使用getCatalogSource
或create_data_frame_from_catalog
使用卡夫卡流源中的記錄,getCatalogSink
或者write_dynamic_frame_from_catalog
將記錄寫入卡夫卡,並且該作業具有數據目錄數據庫和表名信息,則可以使用該信息來獲取從卡夫卡流源讀取一些基本參數。如果使用getSource
、、getCatalogSink
、或getSourceWithFormat
,createDataFrameFromOptions
或 getSinkWithFormat
create_data_frame_from_options
write_dynamic_frame_from_catalog
,則必須使用此處描述的連接選項來指定這些基本參數。
您可以使用類中的指定方法下面的參數指定卡夫卡的連接選項。GlueContext
-
Scala
-
connectionOptions
:與getSource
、createDataFrameFromOptions
、getSink
搭配使用 -
additionalOptions
:與getCatalogSource
、getCatalogSink
搭配使用。 -
options
:與getSourceWithFormat
、getSinkWithFormat
搭配使用。
-
-
Python
-
connection_options
:與create_data_frame_from_options
、write_dynamic_frame_from_options
搭配使用。 -
additional_options
:與create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
搭配使用。 -
options
:與getSource
、getSink
搭配使用。
-
如需有關串流 ETL 任務的注意事項和限制,請參閱 串流 ETL 注意事項和限制。
主題
設定 Kafka
連接到通過互聯網可用的卡夫卡流沒有 AWS 先決條件。
您可以建立 AWS Glue Kafka 連線來管理您的連線認證。如需詳細資訊,請參閱 為 Apache Kafka 資料串流建立 AWS Glue 連線。在您的 AWS Glue 工作組態中,提供 ConnectionName
做為其他網路連線,然後在您的方法呼叫中,提供連線名稱
給參數。connectionName
在某些情況下,您需要設定其他先決條件:
-
如果搭配 IAM 身分驗證使用 Amazon Managed Streaming for Apache Kafka,您會需要適當的 IAM 組態。
-
如果搭配 Amazon VPC 使用 Amazon Managed Streaming for Apache Kafka,您會需要適當的 Amazon VPC 組態。您必須建立可提供 Amazon VPC 連線資訊的 AWS Glue 連線。您需要工作組態,才能將 AWS Glue 連線納入為其他網路連線。
如需有關串流 ETL 任務先決條件的詳細資訊,請參閱 在 AWS Glue 中串流 ETL 任務。
範例:從 Kafka 串流讀取
搭配 forEachBatch 使用。
Kafka 串流來源範例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
範例:寫入卡夫卡串流
寫信給卡夫卡的例子:
使用該getSink
方法的示例:
data_frame_datasource0 = glueContext.getSink( connectionType="kafka", connectionOptions={ JsonOptions("""{ "connectionName": "ConfluentKafka", "classification": "json", "topic": "kafka-auth-topic", "typeOfData": "kafka"} """)}, transformationContext="dataframe_ApacheKafka_node1711729173428") .getDataFrame()
使用該write_dynamic_frame.from_options
方法的示例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "classification": "json" } data_frame_datasource0 = glueContext.write_dynamic_frame.from_options(connection_type="kafka", connection_options=kafka_options)
Kafka 連線選項參考
閱讀時,請使用以下連接選項"connectionType": "kafka"
:
-
"bootstrap.servers"
(必要) 自舉伺服器 URL 的清單,例如b-1.vpc-test-2.o4q88o.c6.kafka.us-east-1.amazonaws.com:9094
。此選項必須在 API 呼叫中指定,或在 Data Catalog 的資料表中繼資料中定義。 -
"security.protocol"
(必要) 用來與代理程式通訊的協定。可能的值為"SSL"
或"PLAINTEXT"
。 -
"topicName"
(必要) 要訂閱的主題清單 (以逗號分隔)。您必須指定"topicName"
、"assign"
或"subscribePattern"
其中一個。 -
"assign"
:(必要) JSON 字串,指定要消耗的特定TopicPartitions
。您必須指定"topicName"
、"assign"
或"subscribePattern"
其中一個。範例:'{"topicA":[0,1],"topicB":[2,4]}'
-
"subscribePattern"
:(必要) 識別要訂閱的主題清單的 Java regex 字串。您必須指定"topicName"
、"assign"
或"subscribePattern"
其中一個。範例:'topic.*'
-
"classification"
(必要) 記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。 -
"delimiter"
(選用) 當classification
為 CSV 時使用的值分隔符號。預設值為 ",
"。 -
"startingOffsets"
:(選用) 要從中讀取資料的 Kafka 主題的起始位置。可能的值為"earliest"
或"latest"
。預設值為"latest"
。 -
"startingTimestamp"
: (選用,僅適用於 AWS Glue 4.0 版或更新版本) Kafka 主題中要讀取資料的記錄時間戳記。可能的值是yyyy-mm-ddTHH:MM:SSZ
模式中 UTC 格式的時間戳記字串 (其中Z
代表以 +/- 表示的 UTC 時區偏移。例如:"2023-04-04T08:00:00-04:00")。注意: AWS Glue 串流指令集的「連線選項」清單中只能有一個「開始偏移」或「開始時間戳記」,包括這兩個屬性會導致工作失敗。
-
"endingOffsets"
:(選用) 批次查詢結束時的終點。可能值為"latest"
或指定每個TopicPartition
結束偏移的 JSON 字串。對於 JSON 字串,格式為
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
。值-1
作為偏移代表"latest"
。 -
"pollTimeoutMs"
:(選用) 在 Spark 任務執行器中從 Kafka 輪詢資料的逾時 (以毫秒為單位)。預設值為512
。 -
"numRetries"
:(選用) 擷取 Kafka 位移失敗之前,要重試的次數。預設值為3
。 -
"retryIntervalMs"
:(選用) 重試擷取 Kafka 偏移量之前等待的時間 (毫秒)。預設值為10
。 -
"maxOffsetsPerTrigger"
:(選用) 每個觸發間隔所處理之偏移數目上限的速率限制。指定的偏移總數會按比例跨topicPartitions
或不同磁碟區而分割。預設值為 null,這表示消費者讀取所有偏移,直到已知的最新偏移。 -
"minPartitions"
:(選用) 從 Kafka 讀取所需的分割區最小數量。預設值為 null,這表示 Spark 分割區的數量等於 Kafka 分割區的數量。 -
"includeHeaders"
:(選用) 是否包含 Kafka 標頭。當選項設定為「true」時,資料輸出將包含一個名為「glue_streaming_kafka_headers」的額外欄,其類型為Array[Struct(key: String, value: String)]
。預設值為 "false"。此選項能在 AWS Glue 3.0 版或更新版中使用。 -
"schema"
:(當 InferSchema 設定為 false 時為必要) 用於處理承載的架構。如果分類為avro
,提供的架構必須採用 Avro 架構格式。如果分類不是avro
,提供的架構必須採用 DDL 架構格式。以下是架構範例。
-
"inferSchema"
:(選用) 預設值為 'false'。如果設為 'true',將在執行時間時從foreachbatch
承載偵測架構。 -
"avroSchema"
:(已棄用) 使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用schema
參數。 -
"addRecordTimestamp"
︰(選用) 當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示主題收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。 -
"emitConsumerLagMetrics"
: (選擇性) 當選項設為 'true' 時,對於每個批次,它會發出主題接收到的最舊記錄到達時間之間的持續時間的AWS Glue指標。 CloudWatch該指標的名稱是「膠合. 驅動程序. maxConsumerLagInMs」. 預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。
寫入時,請使用以下連接選項"connectionType": "kafka"
:
-
"connectionName"
(必要)用於連接到卡夫卡集群的 AWS Glue 連接名稱(類似於卡夫卡源)。 -
"topic"
(必要) 如果主題欄存在,則除非設定了主題組態選項,否則在將指定資料列寫入 Kafka 時,會使用其值作為主題。也就是說,組topic
態選項會覆寫主題欄。 -
"partition"
(選擇性) 如果指定了有效的分割區編號,partition
將在傳送記錄時使用。如果沒有指定分區,但存
key
在一個分區,將使用密鑰的散列來選擇一個分區。如果
key
既不存在partition
也不存在,則當至少為分區產生 batch .size 字節時,將根據粘性分區這些更改選擇分區。 -
"key"
(選擇性) 如partition
果為 null,則用於分割。 -
"classification"
(選擇性) 記錄中資料使用的檔案格式。我們只支持 JSON,CSV 和阿夫羅。使用 Avro 格式,我們可以提供自定義的 AvroSchema 進行序列化,但請注意,這也需要在源代碼上提供反序列化。否則,默認情況下它使用 Apache AvroSchema 進行序列化。
此外,您可以根據需要通過更新卡夫卡生產者配置參數微調卡夫卡
但是,有一個小的拒絕列表的選項不會生效。如需詳細資訊,請參閱 Kafka 特定