具有組態供應商的 Debezium 來源連接器 - Amazon Managed Streaming for Apache Kafka

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

具有組態供應商的 Debezium 來源連接器

這個範例說明如何使用我的SQL相容 Amazon Aurora 資料庫做為來源的取消我的SQL連接器外掛程式。在此範例中,我們也會設定開放原始碼 AWS Secrets Manager Config Provider,以在 AWS Secrets Manager中將資料庫憑證外部化。如需有關組態供應商的詳細資訊,請參閱 使用組態供應商來外部化敏感資訊

重要

Debezium 我的SQL連接器外掛程式僅支援一項任務,不適用於 Amazon Connect 的自動調整容量模式。MSK您應改用佈建容量模式,並在連接器組態中將 workerCount 設定為等於一。若要深入瞭解 MSK Connect 的容量模式,請參閱連接器容量

開始之前

您的連接器必須能夠存取網際網路,以便它可以與您以 AWS Secrets Manager 外的服務互動 Amazon Virtual Private Cloud。本節中的步驟有助您完成以下任務,以啟用網際網路存取。

  • 設定託管NAT閘道的公用子網路,並將流量路由到您的VPC.

  • 建立預設路由,將您的私有子網路流量導向至NAT閘道。

如需詳細資訊,請參閱 啟用 Amazon MSK Connect 的網際網路存取

先決條件

您需先準備好以下事項,才可啟用網際網路存取:

  • 與叢集相關聯的 Amazon Virtual Private Cloud (VPC) 識別碼。例如,vpc-123456ab

  • 您IDs的私人子網路中的VPC. 例如,subnet-a1b2c3desubnet-f4g5h6ij 等。您必須使用私有子網路來設定連接器。

啟用連接器的網際網路存取
  1. 在開啟 Amazon Virtual Private Cloud 主控台https://console.aws.amazon.com/vpc/

  2. 使用描述性名稱為NAT閘道建立公用子網路,並記下子網路 ID。如需詳細指示,請參閱在 VPC.

  3. 建立網際網路閘道,以便您VPC可以與網際網路通訊,並記下閘道 ID。將網際網路閘道連接至您的VPC. 如需說明,請參閱建立並連接網際網路閘道

  4. 佈建公用NAT閘道,讓您的私有子網路中的主機可以連線到您的公用子網路。建立NAT閘道時,請選取您先前建立的公用子網路。如需指示,請參閱建立NAT閘道

  5. 設定路由表。您總共須有兩個路由表才能完成此設定。您應該已經有一個主路由表,該主路由表是在同一時間自動創建的VPC. 在此步驟中,您可為公有子網路建立額外的路由表。

    1. 使用下列設定來修改您VPC的主路由表,讓您的私人子網路將流量路由到NAT閘道。如需說明,請參閱《Amazon Virtual Private Cloud使用者指南》中的使用路由表

      私人MSKC路由表
      屬性 Value
      Name tag (名稱標籤) 我們建議您為此路由表提供描述性名稱標籤,有助於您識別。例如,私人MSKC
      相關聯的子網路 您的私有子網路
      為 Connect 啟用網際網路存取的MSK路由
      • 目的地:0.0.0.0/0

      • 目標:您的NAT閘道 ID。例如,nat-12a345bc6789efg1h

      適用於內部流量的本機路由
      • 目的地:10.0.0.0/16。此值可能會因您VPC的CIDR區塊而有所不同。

      • 目標:本機

    2. 依照建立自訂路由表中的說明,建立公有子網路的路由表。在建立該表時,請在名稱標籤欄位中輸入描述性名稱,以協助您識別與該表相關聯的子網路。例如,「公開 MSKC」。

    3. 使用下列設定來設定您的用MSKC路由表。

      屬性 Value
      Name tag (名稱標籤) 公開MSKC或您選擇的不同描述性名稱
      相關聯的子網路 您的公有子網路與NAT閘道
      為 Connect 啟用網際網路存取的MSK路由
      • 目的地:0.0.0.0/0

      • 目標:您的網際網路閘道 ID。例如,igw-1a234bc5

      適用於內部流量的本機路由
      • 目的地:10.0.0.0/16。此值可能會因您VPC的CIDR區塊而有所不同。

      • 目標:本機

現在您已經為 Amazon MSK Connect 啟用網際網路存取,您就可以建立連接器了。

建立 Debezium 來源連接器

  1. 建立自訂外掛程式
    1. Debezi um 網站下載最新穩定版本的「我的SQL連接器」外掛程式。請記下您下載的 Debezium 發行版本 (版本 2.x 或較舊的 1.x 系列)。稍後在此程序中,您將根據您的 Debezium 版本建立連接器。

    2. 下載並解壓縮 AWS Secrets Manager Config Provider

    3. 將以下封存放入相同的目錄中:

      • debezium-connector-mysql 資料夾

      • jcusten-border-kafka-config-provider-aws-0.1.1 資料夾

    4. 將您在上一步中建立的目錄壓縮到ZIP檔案中,然後將ZIP檔案上傳到 S3 儲存貯體。如需說明,請參閱《Amazon S3 使用者指南》中的上傳物件

    5. 複製以下內容JSON並將其粘貼到文件中。例如 debezium-source-custom-plugin.json。Replace (取代) <example-custom-plugin-name> 使用您希望插件具有的名稱,<arn-of-your-s3-bucket> 與您上傳ZIP文件ARN的 S3 存儲桶,以及<file-key-of-ZIP-object>您上傳到 S3 的ZIP對象的文件密鑰。

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. 從儲存JSON檔案的資料夾執行下列 AWS CLI 命令,以建立外掛程式。

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      您應該會看到類似以下範例的輸出。

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. 執行以下命令來檢查外掛程式狀態。狀態應從 CREATING 變更為 ACTIVE。將ARN佔位符替換為您ARN在上一個命令輸出中獲得的佔位符。

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. 設定 AWS Secrets Manager 和建立資料庫認證的密碼
    1. 開啟 Secrets Manager 主控台,位於https://console.aws.amazon.com/secretsmanager/

    2. 建立新秘密以存放您的資料庫登入憑證。如需說明,請參閱《AWS Secrets Manager使用者指南》中的建立秘密

    3. 複製你的秘密ARN。

    4. 將以下範例政策中的 Secrets Manager 許可新增至您的 服務執行角色。Replace (取代) <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> 與你ARN的秘密。

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      如需有關如何新增IAM權限的指示,請參閱IAM使用指南中的新增和移除IAM身分權限

  3. 使用您組態供應商的資訊來建立自訂工作程序組態
    1. 將以下工作程序組態屬性複製到檔案中,並使用對應至您案例的值來取代預留位置字串。若要進一步了解 AWS Secrets Manager 組態提供者的組態屬性,請參閱外掛程SecretsManagerConfigProvider式的說明文件中的。

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. 執行下列 AWS CLI 命令以建立您的自訂 Worker 組態。

      取代以下的值:

      • <my-worker-config-name> -自訂 Worker 組態的描述性名稱

      • <encoded-properties-file-content-string> -您在上一個步驟中複製的純文字內容的 base64 編碼版本

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. 建立連接器
    1. 複製與您JSON的 Debezium 版本(2.x 或 1.x)相對應的以下內容,然後將其粘貼到新文件中。使用對應至您案例的值來取代 <placeholder> 字串。如需有關如何設定服務執行角色的詳細資訊,請參閱 MSK Connect 的 IAM 角色和政策

      請注意,組態會使用類似 ${secretManager:MySecret-1234:dbusername} 的變數 (而非純文字) 來指定資料庫憑證。使用您秘密的名稱來取代 MySecret-1234,然後加入您要擷取的索引鍵名稱。您也必須取<arn-of-config-provider-worker-configuration>代為ARN自訂 Worker 組態。

      Debezium 2.x

      對於 Debezium 2.x 版本,請複製以下內容JSON並將其粘貼到新文件中。更換 <placeholder> 字符串與您的方案相對應的值。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      對於 Debezium 1.x 版本,請複製以下內容JSON並將其粘貼到新文件中。更換 <placeholder> 字符串與您的方案相對應的值。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. 在上一個步 AWS CLI 驟中儲存JSON檔案的資料夾中執行下列命令。

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      以下是成功執行命令時所得到的輸出範例。

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

如需具有詳細步驟的 Debezium 連接器範例,請參閱簡介 Amazon MSK Connect-使用受管連接器在 Apache Kafka 叢集之間進行串流資料