Connecteur source Debezium avec fournisseur de configuration - Amazon Managed Streaming for Apache Kafka

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Connecteur source Debezium avec fournisseur de configuration

Cet exemple explique comment utiliser le plugin du connecteur Debezium MySQL avec une base de données Amazon Aurora compatible avec MySQL comme source. Dans cet exemple, nous avons également configuré le fournisseur de configuration AWS Secrets Manager open source pour externaliser les informations d'identification de la base de données dans AWS Secrets Manager. Pour en savoir plus sur les fournisseurs de configuration, consultez Externalisation d'informations sensibles à l'aide des fournisseurs de configuration.

Important

Le plugin du connecteur Debezium MySQL ne prend en charge qu'une seule tâche et ne fonctionne pas avec le mode de capacité de mise à l'échelle automatique pour Amazon MSK Connect. Vous devez plutôt utiliser le mode de capacité provisionné et définir la valeur de workerCount égale à un dans la configuration de votre connecteur. Pour en savoir plus sur les modes de capacité de MSK Connect, consultez Capacité du connecteur.

Avant de commencer

Votre connecteur doit être en mesure d'accéder à Internet afin de pouvoir interagir avec des services extérieurs au vôtre Amazon Virtual Private Cloud. AWS Secrets Manager Les étapes décrites dans cette section vous aident à effectuer les tâches suivantes pour activer l'accès à Internet.

  • Configurez un sous-réseau public qui héberge une passerelle NAT et achemine le trafic vers une passerelle Internet dans votre VPC.

  • Créez une route par défaut qui dirige le trafic de votre sous-réseau privé vers votre passerelle NAT.

Pour plus d’informations, consultez Activation de l'accès à Internet pour Amazon MSK Connect.

Prérequis

Avant de pouvoir activer l'accès à Internet, vous devez disposer des éléments suivants :

  • L'ID du Amazon Virtual Private Cloud (VPC) associé à votre cluster. Par exemple, vpc-123456ab.

  • L'ID des sous-réseaux privés de votre VPC. Par exemple, subnet-a1b2c3de, subnet-f4g5h6ij, etc. Vous devez configurer votre connecteur avec des sous-réseaux privés.

Pour activer l'accès à Internet pour votre connecteur
  1. Ouvrez la Amazon Virtual Private Cloud console à l'adresse https://console.aws.amazon.com/vpc/.

  2. Créez un sous-réseau public pour votre passerelle NAT avec un nom descriptif et notez l'ID du sous-réseau. Pour obtenir des informations détaillées, veuillez consulter Créer un sous-réseau dans votre VPC.

  3. Créez une passerelle Internet afin que votre VPC puisse communiquer avec Internet et notez l'ID de passerelle. Attachez la passerelle Internet à votre VPC. Pour de plus amples informations, consultez Créer et attacher une passerelle Internet.

  4. Provisionnez une passerelle NAT publique afin que les hôtes de vos sous-réseaux privés puissent accéder à votre sous-réseau public. Lorsque vous créez la passerelle NAT, sélectionnez le sous-réseau public que vous avez créé précédemment. Pour obtenir des informations, consultez Créer une passerelle NAT.

  5. Configurez vos tables de routage. Vous devez disposer de deux tables de routage au total pour terminer cette configuration. Vous devriez déjà avoir une table de routage principale créée automatiquement en même temps que votre VPC. Au cours de cette étape, vous créez une table de routage supplémentaire pour votre sous-réseau public.

    1. Utilisez les paramètres suivants pour modifier la table de routage principale de votre VPC afin que vos sous-réseaux privés acheminent le trafic vers votre passerelle NAT. Pour obtenir des informations, consultez la section Utiliser des tables de routage dans le Guide de l'utilisateur Amazon Virtual Private Cloud.

      Table de routage MSKC privée
      Propriété Valeur
      Identification de nom Nous vous recommandons de donner à cette table de routage un nom descriptif pour vous aider à l'identifier. Par exemple, MSKC privé.
      Sous-réseaux associés Vos sous-réseaux privés
      Une route pour permettre l'accès à Internet pour MSK Connect
      • Destination : 0.0.0.0/0

      • Cible : votre identifiant de passerelle NAT. Par exemple, nat-12a345bc6789efg1h.

      Une route pour l'ensemble du trafic local
      • Destination : 10.0.0.0/16. Cette valeur peut varier en fonction du bloc d'adresse CIDR de votre VPC.

      • Cible : locale

    2. Suivez les instructions de la section Créer une table de routage personnalisée pour créer une table de routage pour votre sous-réseau public. Lorsque vous créez la table, entrez un nom descriptif dans le champ Identification de nom pour vous aider à identifier le sous-réseau auquel la table est associée. Par exemple, MSKC public.

    3. Configurez votre table de routage MSKC public à l'aide des paramètres suivants.

      Propriété Valeur
      Identification de nom MSKC public ou un autre nom descriptif que vous choisissez
      Sous-réseaux associés Votre sous-réseau public avec passerelle NAT
      Une route pour permettre l'accès à Internet pour MSK Connect
      • Destination : 0.0.0.0/0

      • Cible : votre identifiant de passerelle Internet. Par exemple, igw-1a234bc5.

      Une route pour l'ensemble du trafic local
      • Destination : 10.0.0.0/16. Cette valeur peut varier en fonction du bloc d'adresse CIDR de votre VPC.

      • Cible : locale

Maintenant que vous avez activé l'accès à Internet pour Amazon MSK Connect, vous êtes prêt à créer un connecteur.

Création d'un connecteur source Debezium

  1. Créez un plugin personnalisé
    1. Téléchargez la dernière version stable du plugin du connecteur MySQL depuis le site Debezium. Notez la version de Debezium que vous téléchargez (version 2.x ou ancienne série 1.x). Vous allez créer ultérieurement un connecteur basé sur votre version de Debezium.

    2. Téléchargez et extrayez le fournisseur de configuration AWS Secrets Manager.

    3. Placez les archives suivantes dans le même répertoire :

      • Le dossier debezium-connector-mysql

      • Le dossier jcusten-border-kafka-config-provider-aws-0.1.1

    4. Compressez le répertoire que vous avez créé à l'étape précédente dans un fichier ZIP, puis chargez ce dernier dans un compartiment S3. Pour obtenir des informations, consultez Chargement d'objets dans le Guide de l'utilisateur Amazon S3.

    5. Copiez le code JSON et collez-le dans un fichier. Par exemple, debezium-source-custom-plugin.json. Remplacez < example-custom-plugin-name > par le nom que vous souhaitez attribuer au plugin, < arn-of-your-s 3-bucket> par l'ARN du compartiment S3 dans lequel vous avez chargé le fichier ZIP et <file-key-of-ZIP-object> par la clé de fichier de l'objet ZIP que vous avez chargé sur S3.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Exécutez la AWS CLI commande suivante depuis le dossier dans lequel vous avez enregistré le fichier JSON pour créer un plugin.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Vous devez voir un résultat similaire à ce qui suit.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Exécutez la commande suivante pour vérifier le statut du plugin. Le statut doit passer de CREATING à ACTIVE. Remplacez l'espace réservé de l'ARN par l'ARN que vous avez obtenu dans le résultat de la commande précédente.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Configurer AWS Secrets Manager et créer un secret pour les informations d'identification de votre base de données
    1. Ouvrez la console Secrets Manager en suivant le lien https://console.aws.amazon.com/secretsmanager/.

    2. Créez un nouveau secret pour stocker les informations d'identification de connexion à votre base de données. Pour obtenir des informations, consultez la section Créer un secret dans le Guide de l'utilisateur AWS Secrets Manager.

    3. Copiez l'ARN de votre secret.

    4. Ajoutez les autorisations Secrets Manager de l'exemple de politique suivant à votre Rôles d'exécution du service. Remplacez <arn:aws:secretsmanager:us-east- 1:123456789000:secret : -1234> par l'ARN de votre secret. MySecret

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Pour obtenir des informations sur la façon d'ajouter des autorisations IAM, consultez Ajout et suppression d'autorisations basées sur l'identité IAM dans le Guide d'utilisateur IAM.

  3. Créez une configuration de worker personnalisée avec des informations sur votre fournisseur de configuration
    1. Copiez les propriétés de configuration du worker suivantes dans un fichier, en remplaçant les chaînes d'espace réservé par des valeurs correspondant à votre scénario. Pour en savoir plus sur les propriétés de configuration du fournisseur de configuration AWS Secrets Manager, consultez SecretsManagerConfigProviderla documentation du plugin.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Exécutez la AWS CLI commande suivante pour créer votre configuration de travail personnalisée.

      Remplacez les valeurs suivantes :

      • < my-worker-config-name > - un nom descriptif pour votre configuration de travail personnalisée

      • < encoded-properties-file-content -string> - une version codée en base64 des propriétés en texte brut que vous avez copiées à l'étape précédente

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Créez un connecteur
    1. Copiez le code JSON suivant qui correspond à votre version de Debezium (2.x ou 1.x) et collez-le dans un nouveau fichier. Remplacez les chaînes <placeholder> par des valeurs correspondant à votre scénario. Pour plus d'informations sur la configuration d'un rôle d'exécution de service, consultez Rôles et politiques IAM pour MSK Connect.

      Notez que la configuration utilise des variables comme ${secretManager:MySecret-1234:dbusername} plutôt que du texte brut pour spécifier les informations d'identification de la base de données. Remplacez MySecret-1234 par le nom de votre secret, puis indiquez le nom de la clé que vous souhaitez récupérer. Vous devez également remplacer <arn-of-config-provider-worker-configuration> par l'ARN de votre configuration de worker personnalisée.

      Debezium 2.x

      Pour les versions 2.x de Debezium, copiez le code JSON suivant et collez-le dans un nouveau fichier. Remplacez les chaînes <espace réservé> par des valeurs correspondant à votre scénario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Pour les versions 1.x de Debezium, copiez le code JSON suivant et collez-le dans un nouveau fichier. Remplacez les chaînes <espace réservé> par des valeurs correspondant à votre scénario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Exécutez la AWS CLI commande suivante dans le dossier où vous avez enregistré le fichier JSON à l'étape précédente.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Voici un exemple du résultat que vous obtenez lorsque vous exécutez la commande.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Pour avoir un exemple de connecteur Debezium avec des étapes détaillées, consultez Présentation d'Amazon MSK Connect - Diffusez des données vers et depuis vos clusters Apache Kafka à l'aide de connecteurs gérés.