選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

Kinesis 連線

焦點模式
Kinesis 連線 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

您可以使用 Kinesis 連線,使用存放在 Data Catalog 資料表中的資訊讀取和寫入 Amazon Kinesis 資料串流,或提供資訊以直接存取資料串流。您可以從 Kinesis 讀取資訊到 Spark DataFrame,然後將其轉換為 AWS Glue DynamicFrame。您可以使用 JSON 格式將 DynamicFrames 寫入 Kinesis。如果您直接存取資料串流,則請使用這些選項來提供如何存取資料串流的相關資訊。

如果您使用 getCatalogSourcecreate_data_frame_from_catalog 來取用來自 Kinesis 串流來源的記錄,則該任務具有 Data Catalog 資料庫和資料表名稱資訊,並可以使用它來獲取一些從 Kinesis 串流來源讀取的基本參數。如果使用 getSourcegetSourceWithFormatcreateDataFrameFromOptionscreate_data_frame_from_options,則您必須使用此處描述的連線選項來指定這些基本參數。

您可以使用 GlueContext 類別中指定方法的下列引數來指定 Kinesis 的連線選項。

  • Scala

    • connectionOptions:與 getSourcecreateDataFrameFromOptionsgetSink 搭配使用

    • additionalOptions:與 getCatalogSourcegetCatalogSink 搭配使用。

    • options:與 getSourceWithFormatgetSinkWithFormat 搭配使用。

  • Python

    • connection_options:與 create_data_frame_from_optionswrite_dynamic_frame_from_options 搭配使用。

    • additional_options:與 create_data_frame_from_catalogwrite_dynamic_frame_from_catalog 搭配使用。

    • options:與 getSourcegetSink 搭配使用。

如需有關串流 ETL 任務的注意事項和限制,請參閱 串流 ETL 注意事項和限制

設定 Kinesis

若要連線至 Glue Spark 任務中的 Kinesis AWS 資料串流,您需要一些先決條件:

  • 如果讀取,Glue AWS 任務必須具有 Kinesis 資料串流的讀取存取層級 IAM 許可。

  • 如果寫入,Glue AWS 任務必須具有 Kinesis 資料串流的寫入存取層級 IAM 許可。

在某些情況下,您需要設定其他先決條件:

  • 如果您的 AWS Glue 任務設定為其他網路連線 (通常連接到其他資料集),且其中一個連線提供 Amazon VPC 網路選項,這將引導您的任務透過 Amazon VPC 進行通訊。在這種情況下,您還需要將 Kinesis 資料串流設定為透過 Amazon VPC 進行通訊。為此,您可以建立 Amazon VPC 與 Kinesis 資料串流之間的介面 VPC 端點。如需詳細資訊,請參閱 Using Kinesis Data Streams with Interface VPC Endpoints

  • 在另一個帳戶中指定 Amazon Kinesis Data Streams 時,您必須設定角色和政策以允許跨帳戶存取。如需詳細資訊,請參閱範例:從不同帳戶中的 Kinesis 串流讀取

如需有關串流 ETL 任務先決條件的詳細資訊,請參閱 在 AWS Glue 中串流 ETL 任務

範例:從 Kinesis 串流讀取

範例:從 Kinesis 串流讀取

搭配 forEachBatch 使用。

Amazon Kinesis 串流來源範例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

範例:寫入 Kinesis 串流

範例:從 Kinesis 串流讀取

搭配 forEachBatch 使用。

Amazon Kinesis 串流來源範例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kinesis 連線選項參考

指定 Amazon Kinesis Data Streams 的連線選項。

針對 Kinesis 串流資料來源使用下列的連線選項:

  • "streamARN" (必要) 用於讀取/寫入。Kinesis 資料串流的 ARN。

  • "classification" (讀取時為必要) 用於讀取。記錄中資料使用的檔案格式。除非透過資料型錄提供,否則為必要。

  • "streamName" – (選用) 用於讀取。要從中讀取的 Kinesis 資料串流名稱。與 endpointUrl 搭配使用。

  • "endpointUrl" – (選用) 用於讀取。預設:"https://kinesis.us-east-1.amazonaws.com"。Kinesis 串流的 AWS 端點。除非您要連線到特殊區域,否則無需變更此設定。

  • "partitionKey" – (選用) 用於寫入。在產生記錄時使用的 Kinesis 分割區索引鍵。

  • "delimiter" (選用) 用於讀取。當 classification 為 CSV 時使用的值分隔符號。預設值為 ","。

  • "startingPosition":(選用) 用於讀取。Kinesis 資料串流中要從中讀取資料的起始位置。可能的值包括 "latest""trim_horizon""earliest"yyyy-mm-ddTHH:MM:SSZ 模式中 UTC 格式的時間戳記字串 (其中 Z 代表以 +/- 表示的 UTC 時區偏移。例如:"2023-04-04T08:00:00-04:00")。預設值為 "latest"。注意:僅 AWS Glue 4.0 版或更新版本"startingPosition"支援 UTC 格式的時間戳記字串。

  • "failOnDataLoss":(選用) 如果有任何作用中的碎片遺失或過期,則任務失敗。預設值為 "false"

  • "awsSTSRoleARN":(選用) 用於讀取/寫入。要使用 () 擔任的角色的 Amazon Resource Name AWS Security Token Service (ARN AWS STS)。此角色必須具有描述或讀取 Kinesis 資料串流記錄操作的許可。存取不同帳戶中的資料串流時,您必須使用此參數。搭配 "awsSTSSessionName" 使用。

  • "awsSTSSessionName":(選用) 用於讀取/寫入。使用 AWS STS擔任角色之工作階段的識別符。存取不同帳戶中的資料串流時,您必須使用此參數。搭配 "awsSTSRoleARN" 使用。

  • "awsSTSEndpoint":(選用) 使用 擔任的角色連線至 Kinesis 時要使用的 AWS STS 端點。這允許在 VPC 中使用區域 AWS STS 端點,這是預設全域端點無法做到的。

  • "maxFetchTimeInMs":(選用) 用於讀取。任務執行器從 Kinesis 資料串流讀取目前批次記錄所花費的時間上限,以毫秒 (ms) 為單位指定。在此期間可以進行多次 GetRecords API 呼叫。預設值為 1000

  • "maxFetchRecordsPerShard":(選用) 用於讀取。每個微型批次 Kinesis 資料串流中每個碎片要擷取的記錄數量上限。注意:如果串流任務已經從 Kinesis 讀取額外記錄 (在相同的取得記錄呼叫中),用戶端可以超過此限制。如果 maxFetchRecordsPerShard需要嚴格,則需要 的倍數maxRecordPerRead。預設值為 100000

  • "maxRecordPerRead":(選用) 用於讀取。要從每個 getRecords 操作的 Kinesis 資料串流中擷取的記錄數量上限。預設值為 10000

  • "addIdleTimeBetweenReads":(選用) 用於讀取。增加兩個連續 getRecords 操作之間的時間延遲。預設值為 "False"。此選項僅在 Glue 2.0 及以上版本上才可設定。

  • "idleTimeBetweenReadsInMs":(選用) 用於讀取。兩個連續 getRecords 操作的最小延遲時間,以毫秒為單位指定。預設值為 1000。此選項僅在 Glue 2.0 及以上版本上才可設定。

  • "describeShardInterval":(選用) 用於讀取。指令碼考慮重新分片的兩個 ListShards API 呼叫之間的最小時間間隔。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的重新分片的策略。預設值為 1s

  • "numRetries":(選用) 用於讀取。Kinesis Data Streams API 請求的重試數上限。預設值為 3

  • "retryIntervalMs":(選用) 用於讀取。重試 Kinesis Data Streams API 呼叫之前的冷卻時間期間 (以毫秒為單位)。預設值為 1000

  • "maxRetryIntervalMs":(選用) 用於讀取。Kinesis Data Streams API 呼叫之兩次重試之間的最大冷卻時間期間 (以毫秒為單位)。預設值為 10000

  • "avoidEmptyBatches":(選用) 用於讀取。避免建立空白微批次任務,方法是在批次開始之前檢查 Kinesis 資料串流中是否有未讀取的資料。預設值為 "False"

  • "schema":(在 inferSchema 設定為 false 時為必要) 用於讀取。用於處理承載的結構描述。如果分類為 avro,提供的架構必須採用 Avro 架構格式。如果分類不是 avro,提供的架構必須採用 DDL 架構格式。

    以下是架構範例。

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    `column1` INT, `column2` STRING , `column3` FLOAT
  • "inferSchema":(選用) 用於讀取。預設值為 'false'。如果設為 'true',將在執行時間時從 foreachbatch 承載偵測架構。

  • "avroSchema":(已棄用) 用於讀取。使用 Avro 格式時,用於指定 Avro 資料架構的參數。此參數現已棄用。使用 schema 參數。

  • "addRecordTimestamp":(選用) 用於讀取。當此選項設定為 'true' 時,資料輸出將包含一個名為 "__src_timestamp" 的額外資料欄,其指示串流收到相應記錄的時間。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

  • "emitConsumerLagMetrics":(選用) 用於讀取。當選項設定為 "true" 時,在介於串流收到最舊記錄與其在 AWS Glue 中到達 CloudWatch 的時間之間的持續時間內,將會針對每個批次發出指標。指標的名稱為 "glue.driver.streaming.maxConsumerLagInMs"。預設值為 'false'。在 AWS Glue 4.0 版或更新版中支援此選項。

  • "fanoutConsumerARN":(選用) 用於讀取。Kinesis 串流取用者的 ARN,適用於 streamARN 中指定的串流。用於啟用 Kinesis 連線的強化廣發功能模式。如需有關使用強化廣發功能使用 Kinesis 串流的詳細資訊,請參閱 在 Kinesis 串流任務中使用強化廣發功能

  • "recordMaxBufferedTime" – (選用) 用於寫入。預設:1000 (毫秒)。在等待寫入時,記錄受到緩衝的最長時間。

  • "aggregationEnabled" – (選用) 用於寫入。預設:true。指定是否應在將記錄傳送至 Kinesis 前先彙整記錄。

  • "aggregationMaxSize" – (選用) 用於寫入。預設:51200 (位元組)。若記錄大於此限制,則其會略過彙整工具。請注意,Kinesis 會強制執行 50 KB 的記錄大小限制。若您將此值設定為超過 50 KB,Kinesis 將會拒絕過大的記錄。

  • "aggregationMaxCount" – (選用) 用於寫入。預設:4294967295。要匯入彙整記錄的項目數量上限。

  • "producerRateLimit" – (選用) 用於寫入。預設:150 (%)。作為後端限制的百分比來限制從單一生產者 (例如您的任務) 傳送的每個碎片輸送量。

  • "collectionMaxCount" – (選用) 用於寫入。預設:500。要匯入 PutRecords 請求的項目數量上限。

  • "collectionMaxSize" – (選用) 用於寫入。預設:5242880 (位元組)。使用 PutRecords 請求傳送的最大資料量。

隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。