選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

建立 Debezium 來源連接器

焦點模式
建立 Debezium 來源連接器 - Amazon Managed Streaming for Apache Kafka

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

此程序說明如何建立 Debezium 來源連接器。

  1. 建立自訂外掛程式
    1. Debezium 網站下載 MySQL 連線器外掛程式的最新穩定版本。請記下您下載的 Debezium 發行版本 (版本 2.x 或較舊的 1.x 系列)。稍後在此程序中,您將根據您的 Debezium 版本建立連接器。

    2. 下載並解壓縮 AWS Secrets Manager Config Provider

    3. 將以下封存放入相同的目錄中:

      • debezium-connector-mysql 資料夾

      • jcusten-border-kafka-config-provider-aws-0.1.1 資料夾

    4. 將您在上一個步驟中建立的目錄壓縮為 ZIP 檔案,然後將該 ZIP 檔案上傳至 S3 儲存貯體。如需說明,請參閱《Amazon S3 使用者指南》中的上傳物件

    5. 複製以下 JSON 並貼到檔案中。例如:debezium-source-custom-plugin.json。將 <example-custom-plugin-name> 取代為您希望外掛程式擁有的名稱,將 <amzn-s3-demo-bucket-arn> 取代為您上傳 ZIP 檔案的 Amazon S3 儲存貯體的 ARN,將 <file-key-of-ZIP-object>取代為您上傳到 S3 的 ZIP 物件的檔案金鑰。

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<amzn-s3-demo-bucket-arn>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. 從您儲存 JSON 檔案的資料夾執行下列 AWS CLI 命令,以建立外掛程式。

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      您應該會看到類似以下範例的輸出。

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. 執行以下命令來檢查外掛程式狀態。狀態應從 CREATING 變更為 ACTIVE。使用您在上一個命令輸出中獲得的 ARN 來取代 ARN 預留位置。

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. 設定 AWS Secrets Manager 和建立資料庫登入資料的秘密
    1. 前往以下位置開啟機密管理員控制台:https://console.aws.amazon.com/secretsmanager/

    2. 建立新秘密以存放您的資料庫登入憑證。如需說明,請參閱《AWS Secrets Manager使用者指南》中的建立秘密

    3. 複製您秘密的 ARN。

    4. 將以下範例政策中的 Secrets Manager 許可新增至您的 了解服務執行角色。使用您秘密的 ARN 來取代 <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      如需有關如何新增 IAM 許可的說明,請參閱《IAM 使用者指南》中的新增和移除 IAM 身分許可

  3. 使用您組態供應商的資訊來建立自訂工作程序組態
    1. 將以下工作程序組態屬性複製到檔案中,並使用對應至您案例的值來取代預留位置字串。如需有關 AWS Secrets Manager Config Provider 組態屬性的詳細資訊,請參閱外掛程式文件中的 SecretsManagerConfigProvider

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. 執行下列 AWS CLI 命令來建立您的自訂工作者組態。

      取代以下的值:

      • <my-worker-config-name> - 自訂工作程序組態的描述性名稱

      • <encoded-properties-file-content-string> - 您在上一個步驟中複製的 base64 編碼版本純文字屬性。

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. 建立連接器
    1. 複製以下對應至您 Debezium 版本 (2.x 或 1.x) 的 JSON,然後將其貼至新檔案中。使用對應至您案例的值來取代 <placeholder> 字串。如需有關如何設定服務執行角色的詳細資訊,請參閱 MSK Connect 的 IAM 角色和政策

      請注意,組態會使用類似 ${secretManager:MySecret-1234:dbusername} 的變數 (而非純文字) 來指定資料庫憑證。使用您秘密的名稱來取代 MySecret-1234,然後加入您要擷取的索引鍵名稱。您也必須使用自訂工作程序組態的 ARN 來取代 <arn-of-config-provider-worker-configuration>

      Debezium 2.x

      針對 Debezium 2.x 版本,請複製以下 JSON 並將其貼至新檔案中。使用對應至您案例的值來取代 <placeholder> 字串。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      針對 Debezium 1.x 版本,請複製以下 JSON 並將其貼至新檔案中。使用對應至您案例的值來取代 <placeholder> 字串。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }

      針對 Debezium 2.x 版本,請複製以下 JSON 並將其貼至新檔案中。使用對應至您案例的值來取代 <placeholder> 字串。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. 在您在上一個步驟中儲存 JSON 檔案的資料夾中執行下列 AWS CLI 命令。

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      以下是成功執行命令時所得到的輸出範例。

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }
隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。