本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
此程序說明如何建立 Debezium 來源連接器。
建立自訂外掛程式
從 Debezium
網站下載 MySQL 連線器外掛程式的最新穩定版本。請記下您下載的 Debezium 發行版本 (版本 2.x 或較舊的 1.x 系列)。稍後在此程序中,您將根據您的 Debezium 版本建立連接器。 -
將以下封存放入相同的目錄中:
-
debezium-connector-mysql
資料夾 -
jcusten-border-kafka-config-provider-aws-0.1.1
資料夾
-
-
將您在上一個步驟中建立的目錄壓縮為 ZIP 檔案,然後將該 ZIP 檔案上傳至 S3 儲存貯體。如需說明,請參閱《Amazon S3 使用者指南》中的上傳物件。
-
複製以下 JSON 並貼到檔案中。例如:
debezium-source-custom-plugin.json
。將<example-custom-plugin-name>
取代為您希望外掛程式擁有的名稱,將<amzn-s3-demo-bucket-arn>
取代為您上傳 ZIP 檔案的 Amazon S3 儲存貯體的 ARN,將
取代為您上傳到 S3 的 ZIP 物件的檔案金鑰。<file-key-of-ZIP-object>
{ "name": "
<example-custom-plugin-name>
", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<amzn-s3-demo-bucket-arn>
", "fileKey": "<file-key-of-ZIP-object>
" } } } -
從您儲存 JSON 檔案的資料夾執行下列 AWS CLI 命令,以建立外掛程式。
aws kafkaconnect create-custom-plugin --cli-input-json file://
<debezium-source-custom-plugin.json>
您應該會看到類似以下範例的輸出。
{ "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
-
執行以下命令來檢查外掛程式狀態。狀態應從
CREATING
變更為ACTIVE
。使用您在上一個命令輸出中獲得的 ARN 來取代 ARN 預留位置。aws kafkaconnect describe-custom-plugin --custom-plugin-arn "
<arn-of-your-custom-plugin>
"
設定 AWS Secrets Manager 和建立資料庫登入資料的秘密
-
前往以下位置開啟機密管理員控制台:https://console.aws.amazon.com/secretsmanager/
。 -
建立新秘密以存放您的資料庫登入憑證。如需說明,請參閱《AWS Secrets Manager使用者指南》中的建立秘密。
-
複製您秘密的 ARN。
-
將以下範例政策中的 Secrets Manager 許可新增至您的 了解服務執行角色。使用您秘密的 ARN 來取代
<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>
。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "
<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>
" ] } ] }如需有關如何新增 IAM 許可的說明,請參閱《IAM 使用者指南》中的新增和移除 IAM 身分許可。
-
使用您組態供應商的資訊來建立自訂工作程序組態
-
將以下工作程序組態屬性複製到檔案中,並使用對應至您案例的值來取代預留位置字串。如需有關 AWS Secrets Manager Config Provider 組態屬性的詳細資訊,請參閱外掛程式文件中的 SecretsManagerConfigProvider
。 key.converter=
<org.apache.kafka.connect.storage.StringConverter>
value.converter=<org.apache.kafka.connect.storage.StringConverter>
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
-
執行下列 AWS CLI 命令來建立您的自訂工作者組態。
取代以下的值:
-
<my-worker-config-name>
- 自訂工作程序組態的描述性名稱 -
<encoded-properties-file-content-string>
- 您在上一個步驟中複製的 base64 編碼版本純文字屬性。
aws kafkaconnect create-worker-configuration --name
<my-worker-config-name>
--properties-file-content<encoded-properties-file-content-string>
-
-
建立連接器
-
複製以下對應至您 Debezium 版本 (2.x 或 1.x) 的 JSON,然後將其貼至新檔案中。使用對應至您案例的值來取代
字串。如需有關如何設定服務執行角色的詳細資訊,請參閱 MSK Connect 的 IAM 角色和政策。<placeholder>
請注意,組態會使用類似
${secretManager:MySecret-1234:dbusername}
的變數 (而非純文字) 來指定資料庫憑證。使用您秘密的名稱來取代
,然後加入您要擷取的索引鍵名稱。您也必須使用自訂工作程序組態的 ARN 來取代MySecret-1234
。<arn-of-config-provider-worker-configuration>
針對 Debezium 2.x 版本,請複製以下 JSON 並將其貼至新檔案中。使用對應至您案例的值來取代
<placeholder>
字串。{ "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "
<aurora-database-writer-instance-endpoint>
", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>
", "database.password": "<${secretManager:MySecret-1234:dbpassword}>
", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>
", "topic.prefix": "<logical-name-of-database-server>
", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>
", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>
", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>
", "vpc": { "subnets": [ "<cluster-subnet-1>
", "<cluster-subnet-2>
", "<cluster-subnet-3>
" ], "securityGroups": ["<id-of-cluster-security-group>
"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>
", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>
", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>
", "revision": 1 } } -
在您在上一個步驟中儲存 JSON 檔案的資料夾中執行下列 AWS CLI 命令。
aws kafkaconnect create-connector --cli-input-json file://connector-info.json
以下是成功執行命令時所得到的輸出範例。
{ "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }
-