本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
Kinesis 連線
您可以使用 Kinesis 連線讀取和寫入 Amazon Kinesis 資料串流,使用儲存在資料目錄表格中的資訊,或提供直接存取資料串流的資訊。您可以從 Kinesis 讀取信息到火花 DataFrame,然後將其轉換為 AWS Glue DynamicFrame。您可以 DynamicFrames 用一種JSON格式寫入 Kinesis。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。
如果您使用 getCatalogSource
或 create_data_frame_from_catalog
來取用來自 Kinesis 串流來源的記錄,則該任務具有 Data Catalog 資料庫和資料表名稱資訊,並可以使用它來獲取一些從 Kinesis 串流來源讀取的基本參數。如果使用 getSource
、getSourceWithFormat
、createDataFrameFromOptions
或 create_data_frame_from_options
,則您必須使用此處描述的連線選項來指定這些基本參數。
您可以使用 GlueContext
類別中指定方法的下列引數來指定 Kinesis 的連線選項。
-
Scala
-
connectionOptions
:與getSource
、createDataFrameFromOptions
、getSink
搭配使用 -
additionalOptions
:與getCatalogSource
、getCatalogSink
搭配使用。 -
options
:與getSourceWithFormat
、getSinkWithFormat
搭配使用。
-
-
Python
-
connection_options
:與create_data_frame_from_options
、write_dynamic_frame_from_options
搭配使用。 -
additional_options
:與create_data_frame_from_catalog
、write_dynamic_frame_from_catalog
搭配使用。 -
options
:與getSource
、getSink
搭配使用。
-
如需有關串流ETL工作的注意事項和限制,請參閱串流 ETL 注意事項和限制。
設定 Kinesis
若要連線到 AWS Glue Spark 工作中的 Kinesis 資料串流,您需要一些先決條件:
如果讀取, AWS Glue 工作必須具有 Kinesis 資料串流的讀取存取等級IAM權限。
如果要寫入, AWS Glue 工作必須具有 Kinesis 資料串流的寫入存取層級IAM權限。
在某些情況下,您需要設定其他先決條件:
-
如果您的 AWS Glue 任務設定了「其他網路連線」(通常是用來連接到其他資料集),而其中一個連線提供 Amazon VPC 網路選項,則會引導您的任務透過 Amazon 進行通訊VPC。在這種情況下,您還需要設定 Kinesis 資料串流,以便透過 Amazon VPC 進行通訊。您可以在 Amazon VPC 和 Kinesis 資料串流之間建立介面VPC端點來執行此操作。如需詳細資訊,請參閱將 Kinesis Data Streams 與介面VPC端點搭配使用。
-
在另一個帳戶中指定 Amazon Kinesis Data Streams 時,您必須設定角色和政策以允許跨帳戶存取。如需詳細資訊,請參閱範例:從不同帳戶中的 Kinesis 串流讀取。
如需串流ETL工作先決條件的詳細資訊,請參閱在 AWS Glue 中串流 ETL 任務。
範例:從 Kinesis 串流讀取
範例:從 Kinesis 串流讀取
搭配 forEachBatch 使用。
Amazon Kinesis 串流來源範例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
範例:寫入 Kinesis 串流
範例:從 Kinesis 串流讀取
搭配 forEachBatch 使用。
Amazon Kinesis 串流來源範例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Kinesis 連線選項參考
指定 Amazon Kinesis Data Streams 的連線選項。
針對 Kinesis 串流資料來源使用下列的連線選項:
-
"streamARN"
(必要) 用於讀取/寫入。Kinesis 資料串流ARN的。 -
"classification"
(讀取時為必要) 用於讀取。記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。 -
"streamName"
– (選用) 用於讀取。要從中讀取的 Kinesis 資料串流名稱。與endpointUrl
搭配使用。 -
"endpointUrl"
– (選用) 用於讀取。預設值:"https://kinesis.us-east-1.amazonaws.com"。Kinesis 串流的 AWS 端點。除非您要連線到特殊區域,否則無需變更此設定。 -
"partitionKey"
– (選用) 用於寫入。在產生記錄時使用的 Kinesis 分割區索引鍵。 -
"delimiter"
(選用) 用於讀取。當classification
是時使用的值分隔符號CSV。預設值為 ",
"。 -
"startingPosition"
:(選用) 用於讀取。Kinesis 資料串流中要從中讀取資料的起始位置。可能的值為"latest"
、"trim_horizon"
"earliest"
、或模UTC式中格式的時間戳記字串yyyy-mm-ddTHH:MM:SSZ
(其中Z
代表具有 +/-的UTC時區偏移量。 例如:「4 月 4 日上午 8 時至 4 時」)。預設值為"latest"
。注意:只有 AWS Glue 4.0 或更新版本"startingPosition"
才支援UTC格式的時間戳記字串。 -
"failOnDataLoss"
:(選用) 如果有任何作用中的碎片遺失或過期,則任務失敗。預設值為"false"
。 -
"awsSTSRoleARN"
:(選用) 用於讀取/寫入。角色的 Amazon 資源名稱 (ARN) 假設使用 AWS Security Token Service (AWS STS)。此角色必須具有描述或讀取 Kinesis 資料串流記錄操作的許可。存取不同帳戶中的資料串流時,您必須使用此參數。搭配"awsSTSSessionName"
使用。 -
"awsSTSSessionName"
:(選用) 用於讀取/寫入。使用 AWS STS擔任角色之工作階段的識別符。存取不同帳戶中的資料串流時,您必須使用此參數。搭配"awsSTSRoleARN"
使用。 -
"awsSTSEndpoint"
: (選擇性) 以假定角色連線到 Kinesis 時要使用的 AWS STS 端點。這允許在中使用區域 AWS STS 端點VPC,這對於默認全局端點是不可能的。 -
"maxFetchTimeInMs"
:(選用) 用於讀取。工作執行程式從 Kinesis 資料串流讀取目前批次記錄所花費的時間上限 (毫秒)。在這段時間內可以GetRecords
API撥打多個電話。預設值為1000
。 -
"maxFetchRecordsPerShard"
:(選用) 用於讀取。每個微批次在 Kinesis 資料串流中,每個碎片可擷取的最大記錄數。注意:如果串流工作已讀取 Kinesis 的額外記錄 (在相同的 Get-record 呼叫中),用戶端可能會超過此限制。如果maxFetchRecordsPerShard
需要嚴格,那麼它需要是maxRecordPerRead
. 預設值為100000
。 -
"maxRecordPerRead"
:(選用) 用於讀取。要從每個getRecords
操作的 Kinesis 資料串流中擷取的記錄數量上限。預設值為10000
。 -
"addIdleTimeBetweenReads"
:(選用) 用於讀取。增加兩個連續getRecords
操作之間的時間延遲。預設值為"False"
。此選項僅在 Glue 2.0 及以上版本上才可設定。 -
"idleTimeBetweenReadsInMs"
:(選用) 用於讀取。兩個連續getRecords
操作的最小延遲時間,以毫秒為單位指定。預設值為1000
。此選項僅在 Glue 2.0 及以上版本上才可設定。 -
"describeShardInterval"
:(選用) 用於讀取。兩次ListShards
API調用之間的最小時間間隔,以供腳本考慮重新分片。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的重新分片的策略。預設值為1s
。 -
"numRetries"
:(選用) 用於讀取。Kinesis Data Streams API 要求的重試次數上限。預設值為3
。 -
"retryIntervalMs"
:(選用) 用於讀取。重試 Kinesis Data Streams 呼叫之前的冷卻期間 (以毫秒為單位指定)。API預設值為1000
。 -
"maxRetryIntervalMs"
:(選用) 用於讀取。Kinesis Data Streams 呼叫兩次重試之間的最大冷卻期間 (以毫秒為單位指定)。API預設值為10000
。 -
"avoidEmptyBatches"
:(選用) 用於讀取。避免建立空白微批次任務,方法是在批次開始之前檢查 Kinesis 資料串流中是否有未讀取的資料。預設值為"False"
。 -
"schema"
: (當 inferSchema 設定為 false 時需要) 用於讀取。用於處理承載的結構描述。如果分類為avro
,提供的架構必須採用 Avro 架構格式。如果分類不是提供avro
的模式必須是DDL模式格式。以下是架構範例。
-
"inferSchema"
:(選用) 用於讀取。預設值為 'false'。如果設為 'true',將在執行時間時從foreachbatch
承載偵測架構。 -
"avroSchema"
:(已棄用) 用於讀取。使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用schema
參數。 -
"addRecordTimestamp"
:(選用) 用於讀取。當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示串流收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。 -
"emitConsumerLagMetrics"
:(選用) 用於讀取。當該選項設置為 'true' 時,對於每個批次,它將發出流接收到的最舊記錄到達時間之間的持續時間的AWS Glue指標。 CloudWatch該指標的名稱是「膠合. 驅動程序. maxConsumerLagInMs」. 預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。 -
"fanoutConsumerARN"
:(選用) 用於讀取。中streamARN
指定ARN之串流的 Kinesis 串流用戶。用於啟用 Kinesis 連線的強化廣發功能模式。如需有關使用強化廣發功能使用 Kinesis 串流的詳細資訊,請參閱 在 Kinesis 串流任務中使用強化廣發功能。 -
"recordMaxBufferedTime"
– (選用) 用於寫入。預設:1000 (毫秒)。在等待寫入時,記錄受到緩衝的最長時間。 -
"aggregationEnabled"
– (選用) 用於寫入。預設:true。指定是否應在將記錄傳送至 Kinesis 前先彙整記錄。 -
"aggregationMaxSize"
– (選用) 用於寫入。預設:51200 (位元組)。若記錄大於此限制,則其會略過彙整工具。請注意,Kinesis 會強制執行 50 KB 的記錄大小限制。若您將此值設定為超過 50 KB,Kinesis 將會拒絕過大的記錄。 -
"aggregationMaxCount"
– (選用) 用於寫入。預設:4294967295。要匯入彙整記錄的項目數量上限。 -
"producerRateLimit"
– (選用) 用於寫入。預設:150 (%)。作為後端限制的百分比來限制從單一生產者 (例如您的任務) 傳送的每個碎片輸送量。 -
"collectionMaxCount"
– (選用) 用於寫入。預設:500。要打包到 PutRecords 請求中的最大項目數。 -
"collectionMaxSize"
– (選用) 用於寫入。預設:5242880 (位元組)。與 PutRecords 請求一起發送的最大數據量。