带有配置提供程序的 Debezium 源连接器 - Amazon Managed Streaming for Apache Kafka

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

带有配置提供程序的 Debezium 源连接器

此示例演示了如何将 Debezium MySQL 连接器插件与兼容 MySQL 的 Amazon Aurora 数据库一起用作来源。在此示例中,我们还设置了开源 AWS Secrets Manager 配置提供程序来对 AWS Secrets Manager中的数据库凭证进行外部化。要了解有关配置提供程序的更多信息,请参阅 使用配置提供程序将敏感信息外部化

重要

Debezium MySQL 连接器插件仅支持一项任务,不使用 Amazon MSK Connect 的自动扩缩容量模式。您应该改为使用预置容量模式,并在连接器配置中将 workerCount 设置为 1。要了解有关 MSK Connect 容量模式的更多信息,请参阅 连接器容量

开始前的准备工作

您的连接器必须能够访问互联网,这样它才能与诸如 AWS Secrets Manager 您之外的服务进行交互 Amazon Virtual Private Cloud。本节中的步骤可帮助您完成以下任务以启用互联网访问。

  • 设置托管 NAT 网关并将流量路由到 VPC 中互联网网关的公有子网。

  • 创建将私有子网流量定向到 NAT 网关的默认路由。

有关更多信息,请参阅 为 Amazon MSK Connect 启用互联网访问

先决条件

在启用互联网访问之前,您需要以下项目:

  • 与您的集群关联的 Amazon Virtual Private Cloud (VPC) 的 ID。例如 vpc-123456ab

  • VPC 中的私有子网 ID。例如 subnet-a1b2c3desubnet-f4g5h6ij 等。您必须使用私有子网配置连接器。

为连接器启用互联网访问
  1. 打开 Amazon Virtual Private Cloud 控制台,网址为 https://console.aws.amazon.com/vpc/

  2. 使用描述性名称为您的 NAT 网关创建一个公有子网,并记下子网 ID。有关详细说明,请参阅在 VPC 中创建子网

  3. 创建互联网网关以便您的 VPC 可以与互联网通信,并记下网关 ID。将互联网网关附加到 VPC。有关说明,请参阅创建并附加互联网网关

  4. 预置公有 NAT 网关,以便私有子网中的主机可以访问您的公有子网。创建 NAT 网关时,请选择之前创建的公有子网。有关说明,请参阅创建 NAT 网关

  5. 配置路由表。您总共必须有两个路由表才能完成此设置。您应该已经有一个与您的 VPC 同时自动创建的主路由表。在此步骤中,您需为公有子网创建额外的路由表。

    1. 使用以下设置修改 VPC 的主路由表,以便私有子网将流量路由到您的 NAT 网关。有关说明,请参阅《Amazon Virtual Private Cloud用户指南》中的使用路由表

      私有 MSKC 路由表
      属性
      名称标签 建议您为该路由表指定一个描述性的名称标签,以帮助您识别它。例如私有 MSKC
      关联的子网 您的私有子网
      为 MSK Connect 启用互联网访问的路由
      • 目的地:0.0.0.0/0

      • 目标:您的 NAT 网关 ID。例如 nat-12a345bc6789efg1h

      内部流量的本地路由
      • 目的地:10.0.0.0/16。此值可能会有所不同,具体取决于您 VPC 的 CIDR 块。

      • 目标:本地

    2. 按照创建自定义路由表中的说明,为公有子网创建路由表。创建表时,在名称标签字段中输入描述性名称,以帮助您识别该表与哪个子网关联。例如公有 MSKC

    3. 使用以下设置配置您的公有 MSKC 路由表。

      属性
      名称标签 公有 MSKC 或您选择的其他描述性名称
      关联的子网 带有 NAT 网关的公有子网
      为 MSK Connect 启用互联网访问的路由
      • 目的地:0.0.0.0/0

      • 目标:您的互联网网关 ID。例如 igw-1a234bc5

      内部流量的本地路由
      • 目的地:10.0.0.0/16。此值可能会有所不同,具体取决于您 VPC 的 CIDR 块。

      • 目标:本地

现在,您已经为 Amazon MSK Connect 启用互联网访问,可以创建连接器了。

创建 Debezium 源连接器

  1. 创建自定义插件
    1. Debezium 网站下载 MySQL 连接器插件的最新稳定发行版。记下您下载的 Debezium 发行版(版本 2.x 或较旧的 1.x 系列)。在此程序的后面部分,您需根据您的 Debezium 版本创建连接器。

    2. 下载并解压缩 AWS Secrets Manager 配置提供程序

    3. 将以下档案文件放在同一个目录中:

      • debezium-connector-mysql 文件夹

      • jcusten-border-kafka-config-provider-aws-0.1.1 文件夹

    4. 将您在上一步中创建的目录压缩为 ZIP 文件,然后将该 ZIP 文件上传到 S3 存储桶。有关说明,请参阅《Amazon S3 用户指南》中的上传对象

    5. 复制以下 JSON 并将其粘贴到文件中。例如,debezium-source-custom-plugin.json。将 < example-custom-plugin-name > 替换为您想要的插件名称,将 < arn-of-your-s 3-bucket> 替换为您上传 ZIP 文件的 S3 存储桶的 ARN,以及<file-key-of-ZIP-object>您上传到 S3 的 ZIP 对象的文件密钥。

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. 从保存 JSON 文件的文件夹中运行以下 AWS CLI 命令来创建插件。

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      您应该可以看到类似于以下示例的输出内容。

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. 运行以下命令以检查插件状态。状态应从 CREATING 更改为 ACTIVE。将 ARN 占位符替换为您在上一条命令的输出中获得的 ARN。

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. 为您的数据库凭证配置 AWS Secrets Manager 和创建密钥
    1. 打开 Secrets Manager 控制台,网址为 https://console.aws.amazon.com/secretsmanager/

    2. 创建新密钥来存储您的数据库登录凭证。有关说明,请参阅《AWS Secrets Manager用户指南》中的创建密钥

    3. 复制密钥的 ARN。

    4. 将以下示例策略中的 Secrets Manager 权限添加到您的 服务执行角色。将<arn : aws : secretsmanager : us-east-1:123456789000 : secret : -1234>替换为你的密钥的 ARN。MySecret

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      有关如何添加 IAM 权限的说明,请参阅《IAM 用户指南》中的添加和删除 IAM 身份权限

  3. 使用与配置提供程序有关的信息创建自定义工作程序配置
    1. 将以下工作程序配置属性复制到文件中,将占位符字符串替换为与您的场景对应的值。要了解有关 S AWS ecrets Manager Config Provider 的配置属性的更多信息,请参阅SecretsManagerConfigProvider插件的文档。

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. 运行以下 AWS CLI 命令来创建您的自定义工作器配置。

      替换以下值:

      • < my-worker-config-name >-自定义工作器配置的描述性名称

      • < encoded-properties-file-content-string>-您在上一步中复制的纯文本属性的base64编码版本

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. 创建连接器
    1. 复制与 Debezium 版本(2.x 或 1.x)相对应的以下 JSON,并将其粘贴到新文件中。将 <placeholder> 字符串替换为与您的场景对应的值。有关如何设置服务执行角色的信息,请参阅 MSK Connect 的 IAM 角色和策略

      请注意,该配置使用诸如 ${secretManager:MySecret-1234:dbusername} 之类的变量而不是明文来指定数据库凭证。将 MySecret-1234 替换为密钥名称,然后加入您想要检索的密钥名称。您还必须将 <arn-of-config-provider-worker-configuration> 替换为自定义工作程序配置的 ARN。

      Debezium 2.x

      对于 Debezium 2.x 版本,请复制以下 JSON 并将其粘贴到新文件中。将 <placeholder> 字符串替换为与您的场景对应的值。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      对于 Debezium 1.x 版本,请复制以下 JSON 并将其粘贴到新文件中。将 <placeholder> 字符串替换为与您的场景对应的值。

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. 在上一步中保存 JSON 文件的文件夹中运行以下 AWS CLI 命令。

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      以下是您在成功运行命令后获得的输出示例。

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

有关包含详细步骤的 Debezium 连接器示例,请参阅 Introducing Amazon MSK Connect - Stream Data to and from Your Apache Kafka Clusters Using Managed Connectors