本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
带有配置提供程序的 Debezium 源连接器
此示例演示了如何将 Debezium MySQL 连接器插件与兼容 MySQL 的 Amazon Aurora
重要
开始前的准备工作
您的连接器必须能够访问互联网,这样它才能与诸如 AWS Secrets Manager 您之外的服务进行交互 Amazon Virtual Private Cloud。本节中的步骤可帮助您完成以下任务以启用互联网访问。
设置托管 NAT 网关并将流量路由到 VPC 中互联网网关的公有子网。
创建将私有子网流量定向到 NAT 网关的默认路由。
有关更多信息,请参阅 为 Amazon MSK Connect 启用互联网访问。
先决条件
在启用互联网访问之前,您需要以下项目:
与您的集群关联的 Amazon Virtual Private Cloud (VPC) 的 ID。例如 vpc-123456ab。
VPC 中的私有子网 ID。例如 subnet-a1b2c3de、subnet-f4g5h6ij 等。您必须使用私有子网配置连接器。
为连接器启用互联网访问
-
打开 Amazon Virtual Private Cloud 控制台,网址为 https://console.aws.amazon.com/vpc/
。 -
使用描述性名称为您的 NAT 网关创建一个公有子网,并记下子网 ID。有关详细说明,请参阅在 VPC 中创建子网。
-
创建互联网网关以便您的 VPC 可以与互联网通信,并记下网关 ID。将互联网网关附加到 VPC。有关说明,请参阅创建并附加互联网网关。
-
预置公有 NAT 网关,以便私有子网中的主机可以访问您的公有子网。创建 NAT 网关时,请选择之前创建的公有子网。有关说明,请参阅创建 NAT 网关。
-
配置路由表。您总共必须有两个路由表才能完成此设置。您应该已经有一个与您的 VPC 同时自动创建的主路由表。在此步骤中,您需为公有子网创建额外的路由表。
-
使用以下设置修改 VPC 的主路由表,以便私有子网将流量路由到您的 NAT 网关。有关说明,请参阅《Amazon Virtual Private Cloud用户指南》中的使用路由表。
私有 MSKC 路由表属性 值 名称标签 建议您为该路由表指定一个描述性的名称标签,以帮助您识别它。例如私有 MSKC。 关联的子网 您的私有子网 为 MSK Connect 启用互联网访问的路由 -
目的地:0.0.0.0/0
-
目标:您的 NAT 网关 ID。例如 nat-12a345bc6789efg1h。
内部流量的本地路由 -
目的地:10.0.0.0/16。此值可能会有所不同,具体取决于您 VPC 的 CIDR 块。
-
目标:本地
-
-
按照创建自定义路由表中的说明,为公有子网创建路由表。创建表时,在名称标签字段中输入描述性名称,以帮助您识别该表与哪个子网关联。例如公有 MSKC。
-
使用以下设置配置您的公有 MSKC 路由表。
属性 值 名称标签 公有 MSKC 或您选择的其他描述性名称 关联的子网 带有 NAT 网关的公有子网 为 MSK Connect 启用互联网访问的路由 -
目的地:0.0.0.0/0
-
目标:您的互联网网关 ID。例如 igw-1a234bc5。
内部流量的本地路由 -
目的地:10.0.0.0/16。此值可能会有所不同,具体取决于您 VPC 的 CIDR 块。
-
目标:本地
-
-
现在,您已经为 Amazon MSK Connect 启用互联网访问,可以创建连接器了。
创建 Debezium 源连接器
创建自定义插件
从 Debezium
网站下载 MySQL 连接器插件的最新稳定发行版。记下您下载的 Debezium 发行版(版本 2.x 或较旧的 1.x 系列)。在此程序的后面部分,您需根据您的 Debezium 版本创建连接器。 -
下载并解压缩 AWS Secrets Manager 配置提供程序
。 -
将以下档案文件放在同一个目录中:
-
debezium-connector-mysql
文件夹 -
jcusten-border-kafka-config-provider-aws-0.1.1
文件夹
-
-
将您在上一步中创建的目录压缩为 ZIP 文件,然后将该 ZIP 文件上传到 S3 存储桶。有关说明,请参阅《Amazon S3 用户指南》中的上传对象。
-
复制以下 JSON 并将其粘贴到文件中。例如,
debezium-source-custom-plugin.json
。将< example-custom-plugin-name >
替换为您想要的插件名称,将< arn-of-your-s 3-bucket>
替换为您上传 ZIP 文件的 S3 存储桶的 ARN,以及
您上传到 S3 的 ZIP 对象的文件密钥。<file-key-of-ZIP-object>
{ "name": "
<example-custom-plugin-name>
", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>
", "fileKey": "<file-key-of-ZIP-object>
" } } } -
从保存 JSON 文件的文件夹中运行以下 AWS CLI 命令来创建插件。
aws kafkaconnect create-custom-plugin --cli-input-json file://
<debezium-source-custom-plugin.json>
您应该可以看到类似于以下示例的输出内容。
{ "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
-
运行以下命令以检查插件状态。状态应从
CREATING
更改为ACTIVE
。将 ARN 占位符替换为您在上一条命令的输出中获得的 ARN。aws kafkaconnect describe-custom-plugin --custom-plugin-arn "
<arn-of-your-custom-plugin>
"
为您的数据库凭证配置 AWS Secrets Manager 和创建密钥
-
打开 Secrets Manager 控制台,网址为 https://console.aws.amazon.com/secretsmanager/
。 -
创建新密钥来存储您的数据库登录凭证。有关说明,请参阅《AWS Secrets Manager用户指南》中的创建密钥。
-
复制密钥的 ARN。
-
将以下示例策略中的 Secrets Manager 权限添加到您的 服务执行角色。将
<arn : aws : secretsmanager : us-east-1:123456789000 : secret : -1234>
替换为你的密钥的 ARN。MySecret{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "
<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>
" ] } ] }有关如何添加 IAM 权限的说明,请参阅《IAM 用户指南》中的添加和删除 IAM 身份权限。
-
使用与配置提供程序有关的信息创建自定义工作程序配置
-
将以下工作程序配置属性复制到文件中,将占位符字符串替换为与您的场景对应的值。要了解有关 S AWS ecrets Manager Config Provider 的配置属性的更多信息,请参阅SecretsManagerConfigProvider
插件的文档。 key.converter=
<org.apache.kafka.connect.storage.StringConverter>
value.converter=<org.apache.kafka.connect.storage.StringConverter>
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
-
运行以下 AWS CLI 命令来创建您的自定义工作器配置。
替换以下值:
-
< my-worker-config-name >
-自定义工作器配置的描述性名称 -
< encoded-properties-file-content-string>
-您在上一步中复制的纯文本属性的base64编码版本
aws kafkaconnect create-worker-configuration --name
<my-worker-config-name>
--properties-file-content<encoded-properties-file-content-string>
-
-
创建连接器
-
复制与 Debezium 版本(2.x 或 1.x)相对应的以下 JSON,并将其粘贴到新文件中。将
字符串替换为与您的场景对应的值。有关如何设置服务执行角色的信息,请参阅 MSK Connect 的 IAM 角色和策略。<placeholder>
请注意,该配置使用诸如
${secretManager:MySecret-1234:dbusername}
之类的变量而不是明文来指定数据库凭证。将
替换为密钥名称,然后加入您想要检索的密钥名称。您还必须将MySecret-1234
替换为自定义工作程序配置的 ARN。<arn-of-config-provider-worker-configuration>
-
在上一步中保存 JSON 文件的文件夹中运行以下 AWS CLI 命令。
aws kafkaconnect create-connector --cli-input-json file://connector-info.json
以下是您在成功运行命令后获得的输出示例。
{ "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }
-
有关包含详细步骤的 Debezium 连接器示例,请参阅 Introducing Amazon MSK Connect - Stream Data to and from Your Apache Kafka Clusters Using Managed Connectors