使用阿帕奇卡夫卡作為目標 AWS Database Migration Service - AWS Database Migration Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用阿帕奇卡夫卡作為目標 AWS Database Migration Service

您可以使用 AWS DMS 將資料遷移到 Apache 卡夫卡叢集。Apache Kafka 是一個分散式串流平台。您可以使用 Apache Kafka 來即時擷取和處理串流資料。

AWS 還為阿帕奇卡夫卡(Amazon MSK)提供 Amazon 託管流媒體以用作目標。 AWS DMS Amazon MSK 是一個完全受管的 Apache Kafka 串流服務,用於簡化 Apache Kafka 執行個體的實作和管理。它可與開放原始碼 Apache Kafka 版本搭配使用,而且您可以將 Amazon MSK 執行個體存取為 AWS DMS 目標,就像任何 Apache Kafka 執行個體一樣。如需詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的什麼是 Amazon MSK

Kafka 叢集會記錄串流儲存在稱為主題的類別裡,其中具有分割區。分割區是主題中資料記錄 (訊息) 的唯一識別序列。分割區可以分佈在叢集中的多個中介裝置,以啟用主題記錄的平行處理。如需有關主題和分割區及其在 Apache Kafka 中分佈的詳細資訊,請參閱主題和日誌以及分佈

Kafka 叢集可以是 Amazon MSK 執行個體、Amazon EC2 執行個體上執行的叢集,或是內部部署叢集。Amazon MSK 執行個體或 Amazon EC2 執行個體上的叢集可以位於相同或不同的 VPC。如果您的叢集為內部部署,您可以針對複寫執行個體使用自己的內部部署名稱伺服器來解析叢集的主機名稱。如需為複寫執行個體設定名稱伺服器的相關資訊,請參閱 使用自己的內部部署名稱伺服器。如需設定網路的詳細資訊,請參閱 設定複寫執行個體的網路

使用 Amazon MSK 叢集時,請確定其安全群組允許來自複寫執行個體的存取。如需變更 Amazon MSK 叢集安全群組的相關資訊,請參閱變更 Amazon MSK 叢集的安全群組

AWS Database Migration Service 使用 JSON 將記錄發佈到卡夫卡主題。在轉換過程中, AWS DMS 會將來源資料庫中的每筆記錄序列化為 JSON 格式的屬性值對。

若要將資料從任何受支援的資料來源遷移到目標 Kafka 叢集,可以使用物件對應。您可以使用物件映射來決定如何建構目標主題中的資料記錄。您也可以定義每份資料表的分割區索引鍵,Apache Kafka 用它將資料分組為分割區。

目前, AWS DMS 支援每個工作的單一主題。對於具有多個資料表的單一任務,所有訊息都會傳送至同一主題。每則訊息都包含識別目標結構描述和表格的中繼資料區段。 AWS DMS 版本 3.4.6 及更高版本支援使用物件對應的多主題複製。如需詳細資訊,請參閱 使用物件映射的多主題複製

Apache Kafka 端點設定

您可以透過主控台中的端點設定或 AWS DMS CLI 中的--kafka-settings選項來指定連線詳細資料。每個設定的需求如下:

  • Broker — 以逗號分隔清單的形式指定 Kafka 叢集中一個或多個代理程式的位置。broker-hostname:port例如,"ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"。此設定可以指定叢集中任意或所有代理程式的位置。叢集中介裝置全部都會進行通訊,以處理遷移到主題的資料記錄分割。

  • Topic – (選擇性) 指定主題名稱,長度上限為 255 個字母和符號。您可以使用句號 (.)、底線 (_) 和減號 (-)。具有句點 (.) 或底線 (_) 的主題名稱可能會在內部資料結構中發生衝突。在主題名稱中使用其中一個符號,不要同時使用這兩個符號。如果您未指定主題名稱,則 AWS DMS 會使用"kafka-default-topic"作為移轉主題。

    注意

    若要 AWS DMS 建立您指定的移轉主題或預設主題,請設定auto.create.topics.enable = true為 Kafka 叢集配置的一部分。如需更多資訊,請參閱 使用阿帕奇卡夫卡作為目標時的限制 AWS Database Migration Service

  • MessageFormat – 在端點建立之記錄的輸出格式。訊息格式為 JSON (預設) 或 JSON_UNFORMATTED (不含製表符的單行)。

  • MessageMaxBytes – 在端點上建立之記錄的大小上限 (以位元組為單位)。預設值為 1,000,000。

    注意

    您只能使用 CLI AWS I/SDK 變更為非預MessageMaxBytes設值。例如,若要修改現有的 Kafka 端點並變更 MessageMaxBytes,請使用以下命令。

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails – 提供來源資料庫的詳細交易資訊。此資訊包括遞交時間戳記、記錄位置,以及 transaction_idprevious_transaction_id,和 transaction_record_id (交易內的記錄位移) 的值。預設值為 false

  • IncludePartitionValue – 在 Kafka 訊息輸出中顯示分割區值,除非分割區類型為 schema-table-type。預設值為 false

  • PartitionIncludeSchemaTable – 當磁碟分割類型為 primary-key-type 時,將結構描述和資料表名稱作為磁碟分割值的前綴。這樣做會增加 Kafka 分割區之間的資料分佈。例如,假設 SysBench 結構描述有數千個資料表,而每個資料表的主索引鍵只有有限的範圍。在這種情況下,相同的主金鑰會從數千個資料表發送到相同的分割區,而這將導致調節。預設值為 false

  • IncludeTableAlterOperations – 包含變更控制資料中資料表的任何資料定義語言 (DDL) 操作,例如 rename-tabledrop-tableadd-columndrop-columnrename-column。預設值為 false

  • IncludeControlDetails – 在 Kafka 訊息輸出中顯示資料表定義、欄位定義以及資料表和欄位變更的詳細控制資訊。預設值為 false

  • IncludeNullAndEmpty:包含目標中的 NULL 和空資料欄。預設值為 false

  • SecurityProtocol – 使用 Transport Layer Security (TLS) 設定與 Kafka 目標端點的安全連線。選項包括 ssl-authenticationssl-encryptionsasl-ssl。使用 sasl-ssl 將需要 SaslUsernameSaslPassword

  • SslEndpointIdentificationAlgorithm— 設定憑證的主機名稱驗證。3.5.1 版及更新 AWS DMS 版本支援此設定。選項包括下列項目:

    • NONE:停用用戶端連線中 Broker 的主機名稱驗證。

    • HTTPS:啟用用戶端連線中介的主機名稱驗證。

您可以使用設定來協助提高傳輸速度。由於要執行此操作, AWS DMS 要支援對一個 Apache Kafka 目標叢集執行多執行緒完全載入。 AWS DMS 透過包含下列項目的任務設定以支援此多執行緒:

  • MaxFullLoadSubTasks— 使用此選項可指示要 parallel 載入的來源表格數目上限。 AWS DMS 使用專用子任務將每個表加載到其相應的卡夫卡目標表中。預設值為 8;最大值為 49。

  • ParallelLoadThreads— 使用此選項可指定 AWS DMS 用來將每個資料表載入其 Kafka 目標資料表的執行緒數目。Apache Kafka 目標的最大值為 32。您可以要求提高此上限。

  • ParallelLoadBufferSize – 使用此選項指定平行載入執行緒將資料載入至 Kafka 目標時,緩衝區中存放的記錄數量上限。預設值為 50。最大值為 1000。使用此設定搭配 ParallelLoadThreadsParallelLoadBufferSize,只有在有多個執行緒時才有效。

  • ParallelLoadQueuesPerThread:使用此選項指定每個並行執行緒存取的佇列數目,以便將資料記錄從佇列中取出,並為目標產生批次載入。預設為 1。最多 512 個。

您可以透過調整平行執行緒和大量操作的任務設定,改善 Kafka 端點的變更資料擷取 (CDC) 效能。若要執行此操作,您可以指定並行執行緒數目、每個執行緒的佇列數,以及使用 ParallelApply* 任務設定儲存在緩衝區中的記錄數目。例如,假設您要執行 CDC 載入並平行套用 128 個執行緒。您也想要每個執行緒存取 64 個佇列,且每個緩衝區儲存 50 筆記錄。

若要提升 CDC 效能,請 AWS DMS 支援下列工作設定:

  • ParallelApplyThreads— 指定 CDC 載入期間 AWS DMS 用來將資料記錄推送至 Kafka 目標端點的並行執行緒數目。預設值為零 (0),最大值為 32。

  • ParallelApplyBufferSize – 指定每個緩衝區佇列中存放的最大記錄數目,以便並行執行緒在 CDC 載入期間推送至 Kafka 目標端點。預設值為 100,最大值為 1,000。ParallelApplyThreads 指定多個執行緒時,請使用此選項。

  • ParallelApplyQueuesPerThread – 指定每個執行緒存取的佇列數目,以便從佇列中取出資料記錄,並在 CDC 期間產生 Kafka 端點的批次載入。預設為 1。最多 512 個。

在使用 ParallelApply* 任務設定時,partition-key-type 預設是資料表的 primary-key,而非 schema-name.table-name

使用 Transport Layer Security (TLS) 連線到 Kafka

Kafka 叢集只接受使用 Transport Layer Security (TLS) 的安全連線。透過 DMS,您可以使用以下三個安全通訊協定選項中的任一個來保護 Kafka 端點連線。

SSL 加密 (server-encryption)

用戶端會透過伺服器的憑證來驗證伺服器身分。接著在伺服器和用戶端之間建立加密連線。

SSL 驗證 (mutual-authentication)

伺服器和用戶端透過自己的憑證互相驗證身分。接著在伺服器和用戶端之間建立加密連線。

SASL-SSL (mutual-authentication)

簡單驗證和安全層 (SASL) 方法會以使用者名稱和密碼取代用戶端的憑證,用以驗證用戶端的身分。具體來說,您需要提供伺服器已註冊的使用者名稱和密碼,讓伺服器驗證用戶端的身分。接著在伺服器和用戶端之間建立加密連線。

重要

Apache Kafka 和 Amazon MSK 會接受已解析的憑證。這是 Kafka 和 Amazon MSK 的已知限制。如需詳細資訊,請參閱 Apache Kafka 問題,KAFKA-3700

如果您使用 Amazon MSK,請考慮使用存取控制清單 (ACL) 作為這項已知限制的解決方案。如需使用 ACL 的詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》的「Apache Kafka ACL」一節。

如果您使用的是自我管理的 Kafka 叢集,請參閱 2018 年 10 月 21 日的註解,以取得設定叢集的相關資訊。

使用 SSL 加密搭配 Amazon MSK 或自我管理 Kafka 叢集

您可以使用 SSL 加密來保護與 Amazon MSK 或自我管理 Kafka 叢集的端點連線。當您使用 SSL 加密驗證方法時,用戶端會透過伺服器憑證來驗證伺服器的身分。接著在伺服器和用戶端之間建立加密連線。

使用 SSL 加密連線到 Amazon MSK
  • 建立目標 Kafka 端點時,請使用 ssl-encryption 選項設定安全通訊協定端點設定 (SecurityProtocol)。

    下列 JSON 範例會將安全協議設為 SSL 加密。

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
若要針對自我管理 Kafka 叢集使用 SSL 加密
  1. 如果您在內部部署 Kafka 叢集中使用私有憑證授權機構 (CA),請上傳您的私有 CA 憑證並取得 Amazon Resource Name (ARN)。

  2. 建立目標 Kafka 端點時,請使用 ssl-encryption 選項設定安全通訊協定端點設定 (SecurityProtocol)。下列 JSON 範例會將安全通訊協議設為 ssl-encryption

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. 如果您使用的是私有 CA,請在上述第一步取得的 ARN 中設定 SslCaCertificateArn

使用 SSL 身分驗證

您可以使用 SSL 身分驗證來保護與 Amazon MSK 或自我管理 Kafka 叢集的端點連線。

若要在連線到 Amazon MSK 時使用 SSL 身份驗證進行用戶端身份驗證和加密,請執行下列動作:

  • 準備 Kafka 的私有金鑰和公有憑證。

  • 將憑證上傳到 DMS 憑證管理員:

  • 使用 Kafka 端點設定中指定的對應憑證 ARN 建立 Kafka 目標端點。

若要準備 Amazon MSK 的私有金鑰和公有憑證
  1. 建立 EC2 執行個體並設定用戶端以使用身份驗證,如《Amazon Managed Streaming for Apache Kafka 開發人員指南》中「用戶端身份驗證」一節中的步驟 1 到 9 所述。

    完成這些步驟之後,您將擁有憑證-ARN (儲存在 ACM 中的公用憑證 ARN),以及 kafka.client.keystore.jks 檔案中包含的私有金鑰。

  2. 使用下列命令取得公用憑證,並將憑證複製到 signed-certificate-from-acm.pem 檔案中:

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    該命令會傳回類似於以下範例的資訊。

    {"Certificate": "123", "CertificateChain": "456"}

    然後,請將與 "123" 相等的項目複製到 signed-certificate-from-acm.pem 文件中。

  3. kafka.client.keystore.jks to keystore.p12 中匯入 msk-rsa 金鑰以取得私有金鑰,如下列範例所示。

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. 使用以下命令將 .pem 匯出為 keystore.p12 格式。

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    輸入 PEM 密碼短語訊息隨即出現,並識別用於加密憑證的金鑰。

  5. .pem 檔案中移除 bag 屬性和 key 屬性,以確定第一行的開頭字串如下。

    ---BEGIN ENCRYPTED PRIVATE KEY---
若要將公用憑證和私有金鑰上傳到 DMS 憑證管理員,並測試與 Amazon MSK 的連線
  1. 使用下列命令上傳至 DMS 憑證管理員。

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. 建立 Amazon MSK 目標端點並測試連線,以確保 TLS 身份驗證可正常運作。

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
重要

您可以使用 SSL 驗證來保護與自我管理 Kafka 叢集的連線。在某些情況下,您可能會在內部部署 Kafka 叢集中使用私有憑證授權機構 (CA)。若是如此,請將您的 CA 鏈結、公用憑證和私有金鑰上傳至 DMS 憑證管理員。接著,在建立內部部署 Kafka 目標端點時,在端點設定中使用對應的 Amazon Resource Name (ARN)。

若要準備自我管理 Kafka 叢集的私有金鑰和簽署憑證
  1. 產生金鑰對,如下列範例所示。

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. 產生憑證簽署請求 (CSR)。

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. 使用叢集信任存放區中的 CA 來簽署 CSR。如果您沒有 CA,則可以建立自己的私有 CA。

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. ca-cert 匯入至伺服器信任存放區和金鑰存放區。如果您沒有信任存放區,請使用下列命令建立信任存放區,並將 ca-cert 匯入其中。

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. 簽署憑證。

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. 將簽署的憑證匯入金鑰存放區。

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. 使用以下命令將 on-premise-rsa 金鑰從 kafka.server.keystore.jks 匯入至 keystore.p12

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. 使用以下命令將 .pem 匯出為 keystore.p12 格式。

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. encrypted-private-server-key.pemsigned-certificate.pemca-cert 上傳至 DMS 憑證管理員。

  10. 使用傳回的 ARN 建立端點。

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

使用 SASL-SSL 驗證連線到 Amazon MSK

簡單身分驗證和安全層 (SASL) 方法會透過使用者名稱和密碼來驗證用戶端身分,並在伺服器和用戶端之間建立加密連線。

若要使用 SASL,請先在設定 Amazon MSK 叢集時建立安全的使用者名稱和密碼。如需為 Amazon MSK 叢集設定安全使用者名稱和密碼的說明,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的「為 Amazon MSK 叢集設定 SASL/SCRAM 驗證」。

接著,當您建立 Kafka 目標端點時,請使用 sasl-ssl 選項設定安全通訊協定端點設定 (SecurityProtocol)。您還可以設定 SaslUsernameSaslPassword 選項。請確定這些欄位與您第一次設定 Amazon MSK 叢集時建立的安全使用者名稱和密碼一致,如以下 JSON 範例所示。

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
注意
  • 目前,僅 AWS DMS 支援公共 CA 支援 SASL-SSL。DMS 不支援將 SASL-SSL 搭配由私有 CA 支援的自我管理 Kafka 使用。

  • 對於 SASL SSL 驗證,預設 AWS DMS 支援 SCRAM-SHA-512 機制。 AWS DMS 版本 3.5.0 及更高版本也支持普通機制。若要支援 Plain 機制,請將 KafkaSettings API 資料類型的 SaslMechanism 參數設定為 PLAIN

使用前映像檢視做為目標之 Apache Kafka 的 CDC 列原始值

將 CDC 更新寫入至 Kafka 等資料串流目標時,您可以在更新進行變更前,檢視來源資料庫列的原始值。為了實現這一點,請根據來源資料庫引擎提供的資料, AWS DMS 填入更新事件之前的影像

不同的來源資料庫引擎可提供不同的前映像資訊量:

  • Oracle 僅提供欄更新 (如果有變更的話)。

  • PostgreSQL 僅提供屬於主索引鍵一部分的欄的資料 (無論是否有變更)。如果邏輯複寫正在使用中,且為來源資料表設定 REPLACY IDENTITY FULL,您可以取得列寫入 WALS 前後的完整資訊,並可在此處取得。

  • MySQL 一般會提供所有欄的資料 (無論是否有變更)。

若要啟用前映像功能以從來源資料庫將原始值新增至 AWS DMS 輸出,請使用 BeforeImageSettings 任務設定或 add-before-image-columns 參數。此參數會套用欄轉換規則。

BeforeImageSettings 會使用從來源資料庫系統收集到的值,將新的 JSON 屬性新增至所有更新操作,如下所示。

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
注意

BeforeImageSettings 套用至完全載入以及 CDC 任務 (會遷移現有資料並複寫持續變更),或僅套用至 CDC 任務 (只會複寫資料變更)。請勿將 BeforeImageSettings 套用至僅限完全載入的任務。

針對 BeforeImageSettings 選項,適用的設定如下:

  • EnableBeforeImage 選項設為 true 以啟用前映像功能。預設值為 false

  • 使用 FieldName 選項,將名稱指派給新的 JSON 屬性。若 EnableBeforeImagetrueFieldName 則為必填,且不能留白。

  • ColumnFilter 選項會使用前映像來指定要新增的欄。若只要新增屬於資料表主索引鍵一部分的欄,請使用預設值 pk-only。若只要新增不屬於 LOB 類型的欄,請使用 non-lob。若要新增任何具有前映像值的欄,請使用 all

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

使用前映像轉換規則

您可使用 add-before-image-columns 參數做為任務設定的替代方式,它會套用欄轉換規則。透過此參數,您可以在 Kafka 等資料串流目標上,在 CDC 期間啟用前映像功能。

只要在轉換規則中使用 add-before-image-columns,即可套用前映像結果的更精細的控制。轉換規則可讓您使用物件定位器,以便掌控針對規則選取的資料表。此外,您可以將轉換規則鏈結在一起,讓不同的規則套用至不同的資料表。接著,您可以使用其他規則來操控產生的欄。

注意

請勿在同一個任務內將 add-before-image-columns 參數與 BeforeImageSettings 任務設定一起搭配使用。請改為將參數或設定 (擇一使用,而非兩者同時使用) 用於單一任務。

具有欄的 add-before-image-columns 參數的 transformation 規則類型必須提供 before-image-def 區段。下列顯示一個範例。

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix 的值會附加至欄名稱前,而column-prefix 的預設值為 BI_column-suffix 的值會附加至欄名稱,而預設值為空白。請勿將 column-prefixcolumn-suffix 同時設為空白字串。

column-filter 選擇一個值。若只要新增屬於資料表主索引鍵一部分的欄,請選擇 pk-only。選擇 non-lob 以僅新增不屬於 LOB 類型的欄。或者,選擇 all 以新增任何具有前映像值的欄。

前映像轉換規則範例

下列範例中的轉換規則會在目標中新增名為 BI_emp_no 的欄。因此,UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 等陳述式會以 1 填入 BI_emp_no 欄位。當您將 CDC 更新寫入至 Amazon S3 目標時,BI_emp_no 資料欄會讓它能夠分辨哪個原始列已更新。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

如需使用 add-before-image-columns 規則動作的相關資訊,請參閱 轉換規則與動作

使用阿帕奇卡夫卡作為目標時的限制 AWS Database Migration Service

使用 Apache Kafka 做為目標時,有下列限制:

  • AWS DMS 卡夫卡目標端點不支援適用於 Apache 卡夫卡 (Amazon MSK) 的 Amazon 受管串流的 IAM 存取控制。

  • 不支援完全的 LOB 模式。

  • 為您的叢集指定 Kafka 配置檔案,其內容允許自動 AWS DMS 建立新主題。包括設定,auto.create.topics.enable = true。如果您正在使用 Amazon MSK,您可以在建立 Kafka 叢集時指定預設組態,然後將 auto.create.topics.enable 設定變更為 true。如需預設組態設定的詳細資訊,請參閱《Amazon Managed Streaming for Apache Kafka 開發人員指南》中的「Amazon MSK 預設組態」。如果您需要修改使用 Amazon MSK 建立的現有 Kafka 叢集,請執行 AWS CLI 命令aws kafka create-configuration來更新 Kafka 組態,如下列範例所示:

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    在此,//~/kafka_configuration 是您以必要屬性設定所建立的組態檔案。

    如果您使用自己安裝在 Amazon EC2 上的 Kafka 執行個體,請使用該auto.create.topics.enable = true設定修改 Kafka 叢集組態,以允許 AWS DMS 使用執行個體隨附的選項自動建立新主題。

  • AWS DMS 將每個更新發佈到來源資料庫中的單一記錄,做為指定 Kafka 主題中的一個資料記錄 (訊息),而不論交易為何。

  • AWS DMS 分割區索引鍵支援下列兩種形式:

    • SchemaName.TableName:結構描述和資料表名稱的組合。

    • ${AttributeName}:JSON 其中一個欄位的值,或來源資料庫資料表的主索引鍵。

  • Kafka 端點不支援 BatchApply。對 Kafka 目標使用批次套用 (例如 BatchApplyEnabled 目標中繼資料任務設定) 可能會導致資料遺失。

  • AWS DMS 不支援移轉超過 16 位數的BigInt資料類型值。若要解決此限制,您可以使用下列轉換規則將 BigInt 資料欄轉換為字串。如需轉換規則的詳細資訊,請參閱 轉換規則與動作

    { "rule-type": "transformation", "rule-id": "id", "rule-name": "name", "rule-target": "column", "object-locator": { "schema-name": "valid object-mapping rule action", "table-name": "", "column-name": "" }, "rule-action": "change-data-type", "data-type": { "type": "string", "length": 20 } }

使用物件映射將資料遷移到 Kafka 主題

AWS DMS 使用表格對映規則將資料從來源映射到目標 Kafka 主題。若要將資料映射到目標主題,請使用一種稱為物件映射的資料表映射規則。您可以使用物件映射定義如何將來源中的資料記錄映射到發佈到 Kafka 主題的資料記錄。

除了擁有分割區索引鍵外,Kafka 主題沒有預設結構。

注意

您不一定要使用物件映射。您可以針對各種轉換使用一般資料表映射。不過,分割區索引鍵類型會遵循下列預設行為:

  • 主索引鍵會作為「完全載入」的分割區索引鍵。

  • 如果未使用平行套用任務設定,則 schema.table 會作為 CDC 的分割區索引鍵。

  • 當磁碟分割類型為 時, 會將結構描述和表格名稱作為磁碟分割值的字首。

若要建立物件映射規則,請將 rule-type 指定為 object-mapping。此規則指定您想要使用的物件映射類型。

規則的結構如下。

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS 目前支援map-record-to-recordmap-record-to-document作為參數的唯一有效rule-action值。這些設定會影響 exclude-columns 屬性清單中未排除的值。map-record-to-recordmap-record-to-document值指定依預設 AWS DMS 處理這些記錄的方式。這些值反正不會影響屬性映射。

從關聯式資料庫遷移到 Kafka 主題時,請使用 map-record-to-record。此規則類型使用關聯式資料庫的 taskResourceId.schemaName.tableName 值做為 Kafka 主題的分割區索引鍵,並會為來源資料庫中的每一欄建立一個屬性。

使用 map-record-to-record 時,請注意下列事項:

  • 此設定只會影響 exclude-columns 清單中排除的欄。

  • 對於每個此類列,在目標主題中 AWS DMS 創建一個相應的屬性。

  • AWS DMS 不論是否在屬性對映中使用來源資料行,都會建立此對應屬性。

了解 map-record-to-record 的一種方法是查看它運作時的狀態。在本範例中,假設您開始使用之關聯式資料庫資料表資料列的結構和資料如下。

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

1988/02/29

若要將此資訊從名為 Test 的結構描述遷移至 Kafka 主題,您可以建立規則以將資料映射至目標主題。以下規則說明映射。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

指定 Kafka 主題和分割區索引鍵 (在本例中為 taskResourceId.schemaName.tableName),下列說明使用我們在 Kafka 目標主題中的範例資料所產生的記錄格式:

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

使用屬性映射重組資料

您可以在使用屬性映射將資料遷移到 Kafka 主題的同時重組資料。例如,您可能想要將來源的幾個欄位合併為目標的單一欄位。以下屬性映射說明如何重組資料。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

若要設定 partition-key 的常數值,請指定一個 partition-key 值。例如,您可以執行此操作,將所有資料強制儲存在單一分割區中。以下映射說明此方法。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
注意

用於特定資料表的控制記錄 partition-key 值是 TaskId.SchemaName.TableName。用於特定任務的控制記錄 partition-key 值是該記錄的 TaskId。在物件映射中指定 partition-key 值對控制記錄的 partition-key 沒有影響。

使用物件映射的多主題複製

依預設, AWS DMS 工作會將所有來源資料移轉至下列其中一個 Kafka 主題:

  • 如 AWS DMS 目標端點的「主題」欄位中所指定。

  • 如果目標端點的主題欄位未填寫,且 Kafka auto.create.topics.enable 設定為 true,則為 kafka-default-topic 指定的主題。

對於 3.4.6 及更高版本的 AWS DMS 引擎,您可以使用kafka-target-topic屬性將每個移轉的來源表格對應至單獨的主題。例如,物件映射規則隨後會將來源資料表 CustomerAddress 分別遷移至 Kafka 主題 customer_topicaddress_topic。同時,將所有其他來源資料表 (包括Test結構描述中的Bills表格) AWS DMS 移轉至目標端點中指定的主題。

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

透過使用 Kafka 多主題複寫,您可以使用單一複寫任務將來源資料表來分組和遷移至個別 Kafka 主題。

Apache Kafka 訊息格式

JSON 輸出僅是索引鍵/值對的清單。

RecordType

記錄類型可以是資料或控制。資料記錄代表來源的實際資料列。控制記錄用於串流的重要事件,例如重新啟動任務。

操作

針對資料記錄,操作可以是 loadinsertupdatedelete

針對控制記錄,操作可以是 create-tablerename-tabledrop-tablechange-columnsadd-columndrop-columnrename-columncolumn-type-change

SchemaName

記錄的來源結構描述。控制記錄的此欄位可以為空。

TableName

記錄的來源資料表。控制記錄的此欄位可以為空。

時間戳記

JSON 訊息建構時間的時間戳記。此欄位格式為 ISO 8601 格式。

下列 JSON 訊息範例展示包含所有其他中繼資料的資料類型訊息。

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

下列 JSON 訊息範例展示控制類型訊息。

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }