Conector de origen Debezium con proveedor de configuración - Transmisión gestionada de Amazon para Apache Kafka

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Conector de origen Debezium con proveedor de configuración

En este ejemplo se muestra cómo utilizar el complemento Debezium My SQL connector con una base de datos SQL Amazon Aurora compatible con My como origen. En este ejemplo, también configuramos el proveedor de código abierto AWS Secrets Manager Config Provider para externalizar las credenciales de la base de datos en AWS Secrets Manager. Para obtener más información sobre los proveedores de configuración, consulte Externalización de información confidencial mediante proveedores de configuración.

importante

El complemento Debezium My SQL connector solo admite una tarea y no funciona con el modo de capacidad con escalado automático de Amazon Connect. MSK En su lugar, debe utilizar el modo de capacidad aprovisionada y establecer workerCount igual a uno en la configuración del conector. Para obtener más información sobre los modos de capacidad de MSK Connect, consulteCapacidad de conector.

Antes de empezar

El conector debe poder acceder a Internet para poder interactuar con servicios como los AWS Secrets Manager que están fuera del suyo Amazon Virtual Private Cloud. Los pasos de esta sección le ayudan a realizar las siguientes tareas para habilitar el acceso a Internet.

  • Configure una subred pública que aloje una NAT puerta de enlace y dirija el tráfico a una puerta de enlace de Internet de su VPC propiedad.

  • Cree una ruta predeterminada que dirija el tráfico de su subred privada a su NAT puerta de enlace.

Para obtener más información, consulte Habilitación del acceso a Internet para Amazon MSK Connect.

Requisitos previos 

Antes de habilitar el acceso a Internet, necesitará lo siguiente:

  • El ID del Amazon Virtual Private Cloud (VPC) asociado a tu clúster. Por ejemplo, vpc-123456ab.

  • La IDs de las subredes privadas de su. VPC Por ejemplo, subnet-a1b2c3de, subnet-f4g5h6ij, etc. Debe configurar el conector con subredes privadas.

Habilitación del acceso a Internet para su conector
  1. Abra la Amazon Virtual Private Cloud consola en. https://console.aws.amazon.com/vpc/

  2. Cree una subred pública para su NAT puerta de enlace con un nombre descriptivo y anote el ID de subred. Para obtener instrucciones detalladas, consulte Crear una subred en su. VPC

  3. Cree una puerta de enlace a Internet para VPC poder comunicarse con Internet y anote el ID de la puerta de enlace. Adjunte la puerta de enlace de Internet a suVPC. Para obtener más instrucciones, consulte Crear y adjuntar una puerta de enlace de Internet.

  4. Aprovisione una NAT puerta de enlace pública para que los hosts de sus subredes privadas puedan acceder a su subred pública. Cuando cree la NAT puerta de enlace, seleccione la subred pública que creó anteriormente. Para obtener instrucciones, consulte Crear una NAT puerta de enlace.

  5. Configure sus tablas de enrutamiento. Debe tener dos tablas de enrutamiento en total para completar esta configuración. Debería disponer ya de una tabla de rutas principal que se haya creado automáticamente al mismo tiempo que la suyaVPC. En este paso, crea una tabla de enrutamiento adicional para su subred pública.

    1. Usa la siguiente configuración para modificar tu tabla VPC de rutas principal de modo que tus subredes privadas dirijan el tráfico a tu NAT puerta de enlace. Para obtener instrucciones, consulte Trabajar con tablas de enrutamiento en la Guía del usuario de Amazon Virtual Private Cloud.

      Tabla de MSKC rutas privadas
      Propiedad Valor
      Name tag (Etiqueta de nombre) Le recomendamos que asigne a esta tabla de enrutamiento una etiqueta descriptiva con su nombre para ayudarle a identificarla. Por ejemplo, Privada MSKC.
      Subredes asociadas Sus subredes privadas
      Una ruta para habilitar el acceso a Internet para MSK Connect
      • Destino: 0.0.0.0/0

      • Destino: el ID de su NAT puerta de enlace. Por ejemplo, nat-12a345bc6789efg1h.

      Una ruta local para el tráfico interno
      • Destino: 10.0.0.0/16. Este valor puede variar en función VPC del CIDR bloqueo que utilices.

      • Objetivo: local

    2. Siga las instrucciones en Creación de una tabla de enrutamiento personalizada para crear un cuadro de enrutamiento para la subred pública. Al crear la tabla, introduzca un nombre descriptivo en el campo Etiqueta de nombre para ayudarle a identificar la subred a la que está asociada la tabla. Por ejemplo, Public MSKC.

    3. Configure su tabla de MSKC rutas pública con los siguientes ajustes.

      Propiedad Valor
      Name tag (Etiqueta de nombre) Nombre descriptivo público MSKC o diferente que elija
      Subredes asociadas Su subred pública con NAT puerta de enlace
      Una ruta para habilitar el acceso a Internet para MSK Connect
      • Destino: 0.0.0.0/0

      • Destino: su ID de puerta de enlace de Internet. Por ejemplo, igw-1a234bc5.

      Una ruta local para el tráfico interno
      • Destino: 10.0.0.0/16. Este valor puede variar en función del CIDR bloque al VPC que pertenezcas.

      • Objetivo: local

Ahora que ha activado el acceso a Internet para Amazon MSK Connect, está listo para crear un conector.

Creación de un conector de origen Debezium

  1. Creación de un complemento personalizado
    1. Descargue el complemento My SQL Connector para obtener la última versión estable del sitio de Debezium. Anote la versión de lanzamiento de Debezium que haya descargado (la versión 2.x o la antigua serie 1.x). Más adelante en este procedimiento, creará un conector basado en su versión de Debezium.

    2. Descargue y extraiga el proveedor de configuración de AWS Secrets Manager.

    3. Coloque los siguientes archivos en el mismo directorio:

      • La carpeta debezium-connector-mysql

      • La carpeta jcusten-border-kafka-config-provider-aws-0.1.1

    4. Comprima el directorio que creó en el paso anterior en un ZIP archivo y, a continuación, cárguelo en un ZIP bucket de S3. Para obtener instrucciones, consulte Carga de objetos en la Guía del usuario de Amazon S3.

    5. Copie lo siguiente JSON y péguelo en un archivo. Por ejemplo, debezium-source-custom-plugin.json. Reemplazar <example-custom-plugin-name> con el nombre que quieres que tenga el plugin, <arn-of-your-s3-bucket> con el ARN del depósito de S3 en el que cargó el ZIP archivo y <file-key-of-ZIP-object> con la clave de archivo del ZIP objeto que cargó en S3.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Ejecuta el siguiente AWS CLI comando desde la carpeta en la que guardaste el JSON archivo para crear un complemento.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Debería ver un resultado similar a este.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Ejecute el siguiente comando para comprobar el estado del complemento. El estado debería cambiar de CREATING a ACTIVE. Sustituya el ARN marcador de posición por el ARN que aparece en el resultado del comando anterior.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Configure AWS Secrets Manager y cree un secreto para las credenciales de su base de datos
    1. Abra la consola de Secrets Manager en https://console.aws.amazon.com/secretsmanager/.

    2. Cree un nuevo secreto para almacenar sus credenciales de inicio de sesión de base de datos. Para obtener instrucciones, consulte Creación de un secreto en la Guía del usuario de AWS Secrets Manager.

    3. Copia tus secretosARN.

    4. Agregue los permisos de Secrets Manager desde la siguiente política de ejemplo a su Rol de ejecución del servicio. Reemplazar <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> con el ARN de tu secreto.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Para obtener instrucciones sobre cómo añadir IAM permisos, consulte Añadir y eliminar permisos de IAM identidad en la Guía del IAM usuario.

  3. Creación de una configuración de proceso de trabajo personalizada con información sobre su proveedor de configuración
    1. Copie las siguientes propiedades de configuración de proceso de trabajo en un archivo y sustituya las cadenas de marcadores de posición por valores que correspondan a su caso de uso. Para obtener más información sobre las propiedades de configuración del proveedor de configuración de AWS Secrets Manager, consulte SecretsManagerConfigProviderla documentación del complemento.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Ejecute el siguiente AWS CLI comando para crear su configuración de trabajo personalizada.

      Reemplace los siguientes valores:

      • <my-worker-config-name> - un nombre descriptivo para su configuración de trabajo personalizada

      • <encoded-properties-file-content-string> - una versión codificada en base64 de las propiedades de texto sin formato que copió en el paso anterior

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Crear un conector
    1. Copie lo siguiente JSON que corresponda a su versión de Debezium (2.x o 1.x) y péguelo en un archivo nuevo. Sustituya las cadenas <placeholder> por valores que correspondan a su caso de uso. Para obtener más información sobre cómo configurar un rol de ejecución del servicio, consulte Políticas y roles de IAM para MSK Connect.

      Tenga en cuenta que la configuración utiliza variables como ${secretManager:MySecret-1234:dbusername} en lugar de texto sin formato para especificar las credenciales de la base de datos. Sustituya MySecret-1234 por el nombre de su secreto y, a continuación, incluya el nombre de la clave que desea recuperar. También debe reemplazarla por la de su <arn-of-config-provider-worker-configuration> configuración ARN de trabajo personalizada.

      Debezium 2.x

      Para las versiones 2.x de Debezium, copie lo siguiente JSON y péguelo en un archivo nuevo. Sustituya el <placeholder> cadenas con valores que correspondan a su escenario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Para las versiones 1.x de Debezium, copie lo siguiente JSON y péguelo en un archivo nuevo. Sustituya el <placeholder> cadenas con valores que correspondan a su escenario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Ejecute el siguiente AWS CLI comando en la carpeta en la que guardó el JSON archivo en el paso anterior.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      El siguiente es un ejemplo del resultado que se obtiene al ejecutar el comando correctamente.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Para ver un ejemplo de conector Debezium con pasos detallados, consulte Introducción a Amazon MSK Connect: transmisión de datos desde y hacia sus clústeres de Apache Kafka mediante conectores gestionados.