開始使用 Amazon Managed Streaming for Apache Kafka 中的串流擷取 - Amazon Redshift

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

開始使用 Amazon Managed Streaming for Apache Kafka 中的串流擷取

Amazon Redshift 串流擷取的目的是簡化直接從串流服務擷取串流資料至 Amazon Redshift 或 Amazon Redshift Serverless 的程序。這適用於 Amazon MSK 和 Amazon MSK Serverless 以及 Kinesis。Amazon Redshift 串流擷取不需要在將串流擷取到 Redshift 之前,在 Amazon S3 中暫存 Kinesis Data Streams 串流或 Amazon MSK 主題。

在技術層級上,串流擷取 (來自 Amazon Kinesis Data Streams 和 Amazon Managed Streaming for Apache Kafka) 可提供低延遲、高速擷取的串流或主題資料至 Amazon Redshift 具體化視觀表。在設定之後,您可以使用具體化視觀表重新整理採用大量資料。

執行下列步驟,為 Amazon MSK 設定 Amazon Redshift 串流擷取:

  1. 建立對應至串流資料來源的外部結構描述。

  2. 建立參照外部結構描述的具體化視觀表。

在設定 Amazon Redshift 串流擷取之前,您必須擁有可用的 Amazon MSK 來源。如果您沒有來源,請按照開始使用 Amazon MSK 中的說明進行操作。

注意

串流擷取和 Amazon Redshift Serverless - 本主題中的組態步驟同時適用於佈建的 Amazon Redshift 叢集和 Amazon Redshift Serverless。如需詳細資訊,請參閱 串流擷取行為和資料類型

設定 IAM 許可並從 Kafka 執行串流擷取

假設您有可用的 Amazon MSK 叢集,第一步是使用 CREATE EXTERNAL SCHEMA 在 Redshift 中定義結構描述,並將 Kafka 主題參照為資料來源。之後,若要存取主題中的資料,請在具體化視觀表中定義 STREAM。您可以使用半結構化 SUPER 格式儲存主題中的記錄,或定義結構描述,將資料轉換為 Amazon Redshift 資料類型。當您查詢具體化視觀表時,傳回的記錄是主題的 point-in-time 檢視表。

  1. 使用信任政策來建立 IAM 角色,允許 Amazon Redshift 叢集或 Amazon Redshift Serverless 擔任該角色。如需如何為 IAM 角色設定信任政策的詳細資訊,請參閱授權 Amazon Redshift 代表您存取其他 AWS 服務。建立角色之後,該角色應具有下列 IAM 政策,該政策提供與 Amazon MSK 叢集通訊的許可。如果您使用 Amazon MSK,則需要的政策取決於叢集上使用的身份驗證方法。有關 Amazon MSK 中可用的身份驗證方法,請參閱 Apache Kafka API 的身份驗證和授權

    Amazon MSK 使用未經驗證存取的 IAM 政策:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }

    使用 IAM 身份驗證時適用於 Amazon MSK 的 IAM 政策:

    { "Version": "2012-10-17", "Statement": [ { "Sid": "MSKIAMpolicy", "Effect": "Allow", "Action": [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic", "kafka-cluster:Connect" ], "Resource": [ "arn:aws:kafka:*:0123456789:cluster/MyTestCluster/*", "arn:aws:kafka:*:0123456789:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:*:0123456789:group/MyTestCluster/*" ] }, { "Sid": "MSKPolicy", "Effect": "Allow", "Action": [ "kafka:GetBootstrapBrokers" ], "Resource": "*" } ] }
  2. 檢查您的 VPC,並確認您的 Amazon Redshift 叢集或 Amazon Redshift Serverless 具有前往您 Amazon MSK 叢集的路徑。Amazon MSK 叢集的輸入安全群組規則應允許您的 Amazon Redshift 叢集或 Amazon Redshift Serverless 工作群組的安全群組。當您使用 Amazon MSK 時,您指定的連接埠取決於叢集上使用的身份驗證方法。如需詳細資訊,請參閱連接埠資訊從 VPC 內 AWS 外存取。

    請注意,串流擷取不支援使用 MTL 進行用戶端驗證。如需詳細資訊,請參閱限制

    下表顯示可從 Amazon MSK 設定串流擷取的免費組態選項:

    Amazon Redshift 組態 Amazon MSK 組態 在 Redshift 和 Amazon MSK 之間開啟的連接埠
    AUTHENTICATION NONE TLS 傳輸已停用 9092
    AUTHENTICATION NONE 已啟用的 TLS 傳輸 9094
    AUTHENTICATION IAM IAM 9098/9198

    Amazon Redshift 身份驗證是在 CREATE EXTERNAL SCHEMA 陳述式中設定的。

    如果 Amazon MSK 叢集已啟用相互傳輸層安全性 (mTLS) 身份驗證,則將 Amazon Redshift 設定為使用 AUTHENTICATION NONE 會指示其使用連接埠 9094 進行未驗證的存取。但是,這將會失敗,因為 mTLS 身份驗證正在使用連接埠。因此,我們建議您在使用 mTLS 時切換到 AUTHENTICATION IAM。

  3. 在您的 Amazon Redshift 叢集或Amazon Redshift Serverless 工作群組上啟用增強型 VPC 路由。如需詳細資訊,請參閱啟用增強型 VPC 路由

    注意

    為了擷取 Amazon MSK 啟動程序代理程式 URL,Amazon Redshift 會使用附加的 IAM 角色提供的許可來進行 GetBootstrapBrokersAPI 呼叫。請注意,若要在啟用增強型 VPC 路由時才能成功執行此請求,Amazon Redshift 佈建叢集或 Amazon Redshift 無伺服器工作群組的子網路必須具有 NAT 閘道或網際網路閘道。上述子網路的網路 ACL 和安全群組輸出規則也必須允許存取 Amazon MSK API 服務端點。如需詳細資訊,請參閱適用於 Apache Kafka 端點和配額的 Amazon 受管串流

  4. 在 Amazon Redshift 中,建立外部結構描述以對應至 Amazon MSK 叢集。

    CREATE EXTERNAL SCHEMA MySchema FROM MSK IAM_ROLE { default | 'iam-role-arn' } AUTHENTICATION { none | iam } CLUSTER_ARN 'msk-cluster-arn';

    FROM 子句中,Amazon MSK 表示結構描述會對應來自受管 Kafka 服務的資料。

    當您建立外部結構描述時,Amazon MSK 的串流擷取會提供下列身份驗證類型:

    • none — 指定沒有驗證步驟。

    • iam — 指定 IAM 身份驗證。選擇此選項時,請確保 IAM 角色具有 IAM 身份驗證的許可。

    串流擷取不支援其他 Amazon MSK 身份驗證方法,例如 TLS 驗證或使用者名稱和密碼。

    CLUSTER_ARN 會指定您要從中進行串流處理的 Amazon MSK 叢集。

  5. 建立具體化視觀表以取用主題的資料。如果您不想略過錯誤記錄,請使用類似此範例的 SQL 命令。

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT * FROM MySchema."mytopic";

    下列範例會定義具有 JSON 來源資料的具體化視觀表。請注意,下列檢視會驗證資料是有效的 JSON 和 utf8。Kafka 主題名稱會區分大小寫,且可同時包含大寫和小寫字母。若要從具有大寫名稱的主題擷取,您可以true在資料庫層級enable_case_sensitive_identifier將組態設定為。如需詳細資訊,請參閱名稱和識別碼enable_case_sensitive_identifier

    CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT kafka_partition, kafka_offset, kafka_timestamp_type, kafka_timestamp, kafka_key, JSON_PARSE(kafka_value) as kafka_data, kafka_headers, refresh_time FROM MySchema."mytopic" WHERE CAN_JSON_PARSE(kafka_value);

    若要開啟自動重新整理,請使用 AUTO REFRESH YES。預設行為是手動重新整理。

    中繼資料資料欄包括下列項目:

    中繼資料資料欄 資料類型 描述
    kafka_partition bigint 從 Kafka 主題記錄的分割區 ID
    kafka_offset bigint Kafka 主題中給定分割區記錄的位移
    kafka_timestamp_type char(1)

    在 Kafka 記錄中使用的時間戳記類型:

    • C - 客戶端上的記錄建立時間 (CREATE_TIME)

    • L - Kafka 服務端上的記錄附加時間 (LOG_APPEND_TIME)

    • U – 記錄建立時間不可用 (NO_TIMESTAMP_TYPE)

    kafka_timestamp 沒有時區的時間戳記 記錄的時間戳記值
    kafka_key varbyte Kafka 記錄的索引鍵
    kafka_value varbyte 從 Kafka 收到的記錄
    kafka_headers super 從 Kafka 收到的記錄標頭
    refresh_time 沒有時區的時間戳記 重新整理的開始時間

    請務必注意,如果您的具體化視圖定義中有業務邏輯,在某些情況下,商務邏輯錯誤可能會導致串流擷取區塊。這可能會導致您必須刪除並重新建立具體化視觀表。為了避免這種情況,我們建議您保持簡單的業務邏輯,並在擷取資料之後對其他資料執行其他邏輯。

  6. 重新整理檢視,這會調用 Amazon Redshift 從主題讀取,並將資料載入具體化視觀表。

    REFRESH MATERIALIZED VIEW MyView;
  7. 查詢具體化視觀表中的資料。

    select * from MyView;

    執行 REFRESH 時,具體化視觀表會直接從主題更新。您可以建立對應至 Kafka 主題資料來源的具體化視觀表。您可以對資料執行篩選和彙總,以做為具體化視觀表定義的一部分。您的串流擷取具體化視觀表 (基礎具體化視觀表) 只能參照一個 Kafka 主題,但是您可以建立與基礎具體化視觀表及其他具體化視觀表或資料表結合的額外具體化視觀表。

如需串流擷取限制的相關資訊,請參閱 串流擷取行為和資料類型