Amazon Athena Apache Kafka 連接器 - Amazon Athena

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon Athena Apache Kafka 連接器

適用於 Apache 卡夫卡的 Amazon Athena 連接器使 Amazon Athena 能夠針對您的 Apache 卡夫卡主SQL題執行查詢。使用此連接器可以在 Athena 中以資料表的形式檢視 Apache Kafka 主題,並以資料列的形式檢視訊息。

必要條件

使用 Athena 主控台或 AWS Serverless Application Repository,將連接器部署到您的 AWS 帳戶 。如需詳細資訊,請參閱 部署資料來源連接器使用部署 AWS Serverless Application Repository 資料來源連接器

限制

  • 不支DDL援寫入作業。

  • 任何相關的 Lambda 限制。如需詳細資訊,請參閱《AWS Lambda 開發人員指南》中的 Lambda 配額

  • 篩選條件中的日期和時間戳記資料類型必須轉換為適當的資料類型。

  • 檔案類型不支援日期和時間戳記資料CSV類型,且會被視為 varchar 值。

  • 不支援對應至巢狀JSON欄位。連接器僅映射最上層欄位。

  • 連接器不支援複雜類型。複雜類型會轉譯為字串。

  • 若要擷取或處理複雜JSON值,請使用 Athena 提供的JSON相關函數。如需詳細資訊,請參閱 從字符串中提取JSON數據

  • 連接器不支援存取 Kafka 訊息中繼資料。

條款

  • 中繼資料處理常式 - 從資料庫執行個體中擷取中繼資料的 Lambda 處理常式。

  • 記錄處理常式 - 從資料庫執行個體中擷取資料記錄的 Lambda 處理常式。

  • 複合處理常式 - 從資料庫執行個體中擷取中繼資料和資料記錄的 Lambda 處理常式。

  • Kafka 端點 – 與 Kafka 執行個體建立連線的文字字串。

叢集相容性

Kafka 連接器可搭配下列叢集類型使用。

連線至 Confluent

連線到 Confluent 需要以下步驟:

  1. 從結合生成一個API密鑰。

  2. 將 Confluent 密API鑰的用戶名和密碼存儲到 AWS Secrets Manager.

  3. 提供 Kafka 連接器中 secrets_manager_secret 環境變數的密碼名稱。

  4. 請遵循本文件 設定 Kafka 連接器 一節中的步驟。

支援的身分驗證方法

連接器支援下列身分驗證方法。

  • SSL

  • SASL/SCRAM

  • SASL/PLAIN

  • SASL/PLAINTEXT

  • 不是 _ AUTH

  • 自我管理的卡夫卡和匯流平台 —SSL,SASL/,/SCRAM, NO_ SASL PLAINTEXT AUTH

  • 自我管理的卡夫卡和匯流雲 —/SASLPLAIN

如需詳細資訊,請參閱 設定 Athena Kafka 連接器的身分驗證

支援的輸入資料格式

連接器支援以下輸入資料格式。

  • JSON

  • CSV

  • AVRO

  • PROTOBUF (PROTOCOL BUFFERS)

參數

使用本節提及的 Lambda 環境變數來設定 Athena Kafka 連接器。

  • auth_type – 指定叢集的身分驗證類型。連接器支援下列身分驗證類型:

    • NO_ AUTH — 直接 Connect 至 Kafka (例如,部署在不使用驗證的EC2執行個體上部署的 Kafka 叢集)。

    • SASL_ SSL _ PLAIN — 此方法使用SASL_SSL安全通訊協定和PLAINSASL機制。如需詳細資訊,請參閱 Apache Kafka 文件中的SASL組態。

    • SASL_ PLAINTEXT _ PLAIN — 此方法使用SASL_PLAINTEXT安全通訊協定和PLAINSASL機制。如需詳細資訊,請參閱 Apache Kafka 文件中的SASL組態。

    • SASL_ _ SSL SCRAM _ SHA512 — 您可以使用此驗證類型來控制對 Apache Kafka 叢集的存取。此方法將使用者名稱和密碼儲存在中 AWS Secrets Manager。秘密必須與 Kafka 叢集相關聯。如需詳細資訊,請參閱 Apache Kafka 說明文件SCRAM中的使用SASL/進行驗證

    • SASL_ _ PLAINTEXT SCRAM _ SHA512 — 此方法使用SASL_PLAINTEXT安全通訊協定和SCRAM_SHA512 SASL機制。此方法使用儲存在中的使用者名稱和密碼 AWS Secrets Manager。如需詳細資訊,請參閱 Apache Kafka 文件的SASL設定一節。

    • SSL-SSL 身份驗證使用密鑰存儲和信任存儲文件與 Apache 卡夫卡集群連接。您必須產生信任存放區和金鑰存放區檔案,將其上傳至 Amazon S3 儲存貯體,並在部署連接器時提供 Amazon S3 參考。金鑰存放區、信任存放區和SSL金鑰儲存在中 AWS Secrets Manager。部署連接器時,您的用戶端必須提供 AWS 秘密金鑰。如需詳細資訊,請參閱 Apache Kafka 說明文件SSL中的使用加密和驗證

      如需詳細資訊,請參閱 設定 Athena Kafka 連接器的身分驗證

  • certificates_s3_reference – 包含憑證 (金鑰存放區和信任存放區檔案) 的 Amazon S3 位置。

  • disable_spill_encryption - (選用) 當設定為 True 時,停用溢出加密。預設為False使溢滿到 S3 的資料使用 AES GCM-來加密,無論是使用隨機產生的金鑰或KMS產生金鑰。停用溢出加密可以提高效能,尤其是如果溢出位置使用伺服器端加密

  • kafka_endpoint – 提供給 Kafka 的端點詳細資訊。

  • 構登錄檔 — 結構描述登錄的URL位址 (例如,)。http://schema-registry.example.org:8081套用至AVROPROTOBUF資料格式。

  • secrets_manager_secret – 用來儲存憑證的 AWS 祕密的名稱。

  • 溢出參數 –Lambda 函數會將不適用記憶體的資料暫時存放 (「溢出」) 至 Amazon S3。由相同 Lambda 函數存取的所有資料庫執行個體溢出到相同的位置。請使用以下資料表中的參數來指定溢出位置。

    參數 描述
    spill_bucket 必要。Lambda 函數可在其中溢出資料的 Amazon S3 儲存貯體的名稱。
    spill_prefix 必要。Lambda 函數可在其中溢出資料的溢出儲存貯體的字首。
    spill_put_request_headers (選擇性) 用於溢出的 Amazon S3 請求的putObject請求標頭和值的JSON編碼映射 (例如,{"x-amz-server-side-encryption" : "AES256"})。如需其他可能的標頭,請參閱 Amazon 簡易儲存服務API參考PutObject中的。
  • 子網路 IDs — 對應至子網路IDs的一或多個子網路,Lambda 函數可用來存取您的資料來源。

    • 公用卡夫卡叢集或標準 Confluent 雲端叢集 — 將連接器與具有閘道的私有子網路建立關聯。NAT

    • Confluent 雲端叢集與私有連線 – 將連接器與具有對 Confluent 雲端叢集的路由的私有子網路建立關聯。

      • 對於 AWS Transit Gateway,子網路必須位於連接至 Confluent Cloud 使用的相同傳輸閘道的子網路中。VPC

      • VPC等互連,子網路必須位於與 Confluent 雲端對等VPC的子網路中。VPC

      • 對於 AWS PrivateLink,子網路必須位於具VPC有路由到連線到 Confluent 雲VPC端的端點。

注意

如果您將連接器部署到以存取私有資源,並且還想要連線到可公開存取的服務 (例如 Confluent),則必須將連接器與具有NAT閘道的私有子網路產生關聯。VPC如需詳細資訊,請參閱 Amazon VPC 使用者指南中的NAT閘道

支援的資料類型

下表顯示 Kafka 和 Apache Arrow 支援的相應資料類型。

Kafka Arrow
CHAR VARCHAR
VARCHAR VARCHAR
TIMESTAMP MILLISECOND
DATE DAY
BOOLEAN BOOL
SMALLINT SMALLINT
INTEGER INT
BIGINT BIGINT
DECIMAL FLOAT8
DOUBLE FLOAT8

分割區和分隔

Kafka 主題會分為多個分割區。每個分割區都已排序。分割區中的每個訊息都有一個稱為位移的增量 ID。每個 Kafka 分割區可再細分為多個分隔,用於並行處理。資料在 Kafka 叢集中設定的保留期間內可供使用。

最佳實務

最佳實務是在您查詢 Athena 時使用述詞下推,如以下範例所示。

SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE integercol = 2147483647
SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'

設定 Kafka 連接器

您必須先設定 Apache Kafka 叢集、使用 AWS Glue 結構描述登錄檔來定義結構描述,以及為連接器設定身分驗證,才能使用連接器。

使用 AWS Glue 結構描述登錄檔時,請注意以下幾點:

  • 請確定 AWS Glue 結構描述登錄檔的 Description (描述) 欄位中的文字包含字串 {AthenaFederationKafka}。對於搭配 Amazon Athena 卡夫卡連接器使用的 AWS Glue 登錄,需要此標記字串。

  • 為了獲得最佳效能,請僅使用小寫作為資料庫名稱和資料表名稱。使用混合大小寫會導致連接器執行運算密集程度較高的不區分大小寫搜尋。

若要設定您的 Apache 卡夫卡環境和 AWS Glue 結構描述登錄
  1. 設定 Apache Kafka 環境。

  2. 以JSON格式將 Kafka 主題描述檔案 (也就是其結構描述) 上傳至結 AWS Glue 構描述登錄檔。如需詳細資訊,請參閱 AWS Glue 開發人員指南中的與 AWS Glue 結構描述登錄整合

  3. 若要在結構描述登錄檔中定義結構描述時使用AVROPROTOBUF資料格式:AWS Glue

    • 對於結構描述名稱,請在與原始相同的大小寫中輸入 Kafka 主題名稱。

    • 在 [資料格式] 中,選擇 Apache Avro通訊協定緩衝區

    如需範例結構描述,請參閱下一節。

將結構描述上傳至 AWS Glue 結構描述登錄檔時,請使用本節中的範例格式。

JSON類型結構描述範

在下列範例中,要在結構描述登錄檔中建立的 AWS Glue 結構描述指定json為的值,dataFormat並使用datatypejsontopicName

注意

topicName 的值應使用與 Kafka 中的主題名稱相同的大小寫。

{ "topicName": "datatypejson", "message": { "dataFormat": "json", "fields": [ { "name": "intcol", "mapping": "intcol", "type": "INTEGER" }, { "name": "varcharcol", "mapping": "varcharcol", "type": "VARCHAR" }, { "name": "booleancol", "mapping": "booleancol", "type": "BOOLEAN" }, { "name": "bigintcol", "mapping": "bigintcol", "type": "BIGINT" }, { "name": "doublecol", "mapping": "doublecol", "type": "DOUBLE" }, { "name": "smallintcol", "mapping": "smallintcol", "type": "SMALLINT" }, { "name": "tinyintcol", "mapping": "tinyintcol", "type": "TINYINT" }, { "name": "datecol", "mapping": "datecol", "type": "DATE", "formatHint": "yyyy-MM-dd" }, { "name": "timestampcol", "mapping": "timestampcol", "type": "TIMESTAMP", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS" } ] } }

CSV類型結構描述範

在下列範例中,要在結構描述登錄檔中建立的 AWS Glue 結構描述指定csv為的值,dataFormat並使用datatypecsvbulktopicNametopicName 的值應使用與 Kafka 中的主題名稱相同的大小寫。

{ "topicName": "datatypecsvbulk", "message": { "dataFormat": "csv", "fields": [ { "name": "intcol", "type": "INTEGER", "mapping": "0" }, { "name": "varcharcol", "type": "VARCHAR", "mapping": "1" }, { "name": "booleancol", "type": "BOOLEAN", "mapping": "2" }, { "name": "bigintcol", "type": "BIGINT", "mapping": "3" }, { "name": "doublecol", "type": "DOUBLE", "mapping": "4" }, { "name": "smallintcol", "type": "SMALLINT", "mapping": "5" }, { "name": "tinyintcol", "type": "TINYINT", "mapping": "6" }, { "name": "floatcol", "type": "DOUBLE", "mapping": "7" } ] } }

AVRO類型結構描述範

下面的例子是用來在模式註冊表中創建AVRO基於模 AWS Glue 式的模式。當您在結構描述登錄中定義 AWS Glue 結構描述時,對於結構描述名稱,您可以使用與原始相同的大小寫輸入 Kafka 主題名稱,對於資料格式,請選擇 Apache Avro。因為您直接在登錄中指定此資訊,因此dataformattopicName欄位不是必要欄位。

{ "type": "record", "name": "avrotest", "namespace": "example.com", "fields": [{ "name": "id", "type": "int" }, { "name": "name", "type": "string" } ] }

PROTOBUF類型結構描述範

下面的例子是用來在模式註冊表中創建PROTOBUF基於模 AWS Glue 式的模式。當您在結構描述登錄中定義 AWS Glue 結構描述時,對於結構描述名稱,您可以在與原始相同的大小寫中輸入 Kafka 主題名稱,對於資料格式,請選擇通訊協定緩衝區。因為您直接在登錄中指定此資訊,因此dataformattopicName欄位不是必要欄位。第一行將結構定義為PROTOBUF。

syntax = "proto3"; message protobuftest { string name = 1; int64 calories = 2; string colour = 3; }

如需有關在結構描述登錄檔中新增登錄和 AWS Glue 結構描述的詳細資訊,請參閱 AWS Glue 說明文件中的結構描述登錄入門

設定 Athena Kafka 連接器的身分驗證

您可以使用各種方法對 Apache 卡夫卡叢集進行驗證,包括SASL/SSLSCRAMPLAIN、SASL/和SASL/。PLAINTEXT

下表顯示連接器的驗證類型,以及每種連接器的安全通訊協定和SASL機制。如需有關詳細資訊,請參閱 Apache Kafka 文件的安全性一節。

auth_type security.protocol sasl.mechanism 叢集類型相容性
SASL_SSL_PLAIN SASL_SSL PLAIN
  • 自我管理的 Kafka

  • Confluent 平台

  • Confluent 雲端

SASL_PLAINTEXT_PLAIN SASL_PLAINTEXT PLAIN
  • 自我管理的 Kafka

  • Confluent 平台

SASL_SSL_SCRAM_SHA512 SASL_SSL SCRAM-SHA-512
  • 自我管理的 Kafka

  • Confluent 平台

SASL_PLAINTEXT_SCRAM_SHA512 SASL_PLAINTEXT SCRAM-SHA-512
  • 自我管理的 Kafka

  • Confluent 平台

SSL SSL N/A
  • 自我管理的 Kafka

  • Confluent 平台

SSL

如果叢集SSL經過驗證,您必須產生信任存放區和金鑰存放區檔案,並將其上傳到 Amazon S3 儲存貯體。部署連接器時,您必須提供此 Amazon S3 參考。金鑰存放區、信任存放區和SSL金鑰會儲存在 AWS Secrets Manager. 您可以在部署連接器時提供 AWS 私密金鑰。

如需有關在 Secrets Manager 中建立祕密的詳細資訊,請參閱建立 AWS Secrets Manager 祕密

若要使用此身分驗證類型,請設定環境變數,如下表所示。

參數 Value
auth_type SSL
certificates_s3_reference 包含憑證的 Amazon S3 位置。
secrets_manager_secret 您的 AWS 密鑰的名稱。

在 Secrets Manager 中建立祕密之後,您可以在 Secrets Manager 主控台中進行檢視。

若要檢視 Secrets Manager 中的祕密
  1. 開啟 Secrets Manager 主控台,位於https://console.aws.amazon.com/secretsmanager/

  2. 在導覽窗格中,選擇 Secrets (祕密)。

  3. Secrets (祕密) 頁面中,選擇祕密的連結。

  4. 在祕密的詳細資訊頁面上,選擇 Retrieve secret value (擷取祕密值)。

    下圖顯示了具有三個金鑰/值對的祕密範例:keystore_passwordtruststore_passwordssl_key_password

    擷取SSL密碼管理員中的密碼

如需與 Kafka SSL 搭配使用的詳細資訊,請參閱 Apache Kafka 說明文件SSL中的使用加密和驗證

SASL/SCRAM

如果您的叢集使用SCRAM驗證,請在部署連接器時提供與叢集相關聯的 Secrets Manager 金鑰。使用者的 AWS 憑證 (祕密金鑰和存取金鑰) 可用於向叢集進行身分驗證。

設定環境變數,如下表所示。

參數 Value
auth_type SASL_SSL_SCRAM_SHA512
secrets_manager_secret 您的 AWS 密鑰的名稱。

下圖顯示了 Secrets Manager 主控台中含有兩個金鑰/值對的祕密範例:一個用於 username,另一個用於 password

擷取密SCRAM碼管理員中的密碼

如需使用SASL/SCRAM搭配 Kafka 使用的詳細資訊,請參閱 Apache Kafka 說明文件SCRAM中的使用SASL/進行驗證

授權資訊

使用此連接器即表示您確認已包含協力廠商元件,其清單可在此連接器的 pom.xml 檔案中找到,並同意 GitHub .com 上 LICENSE.txt 檔案中提供的個別協力廠商授權中的條款。

其他資源

如需有關此連接器的其他資訊,請造訪 GitHub .com 上的對應網站