Conector de origem Debezium com provedor de configuração - Amazon Managed Streaming for Apache Kafka

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Conector de origem Debezium com provedor de configuração

Este exemplo mostra como usar o plug-in Debezium My SQL connector com um banco de dados SQL Amazon Aurora compatível com My como fonte. Neste exemplo, também configuramos o AWS Secrets Manager Config Provider de código aberto para externalizar as credenciais do banco de dados no AWS Secrets Manager. Para saber mais sobre os provedores de configuração, consulte Externalizando informações confidenciais usando provedores de configuração.

Importante

O plug-in Debezium My SQL connector suporta apenas uma tarefa e não funciona com o modo de capacidade com escalabilidade automática para o Amazon Connect. MSK Em vez disso, você deve usar o modo de capacidade provisionada e definir workerCount igual a um na configuração do conector. Para saber mais sobre os modos de capacidade do MSK Connect, consulteCapacidade do conector.

Antes de começar

Seu conector deve ser capaz de acessar a Internet para poder interagir com serviços como os AWS Secrets Manager que estão fora do seu Amazon Virtual Private Cloud. As etapas desta seção ajudam você a concluir as tarefas a seguir para habilitar o acesso à Internet.

  • Configure uma sub-rede pública que hospede um NAT gateway e direcione o tráfego para um gateway de internet em seuVPC.

  • Crie uma rota padrão que direcione seu tráfego de sub-rede privada para seu NAT gateway.

Para ter mais informações, consulte Como habilitar o acesso à Internet para o Amazon MSK Connect.

Pré-requisitos

Antes de habilitar o acesso à Internet, você precisa dos seguintes itens:

  • O ID do Amazon Virtual Private Cloud (VPC) associado ao seu cluster. Por exemplo, vpc-123456ab.

  • A IDs das sub-redes privadas em sua. VPC Por exemplo, subnet-a1b2c3de, subnet-f4g5h6ij etc. Você deve configurar seu conector com sub-redes privadas.

Para habilitar o acesso à Internet para seu conector
  1. Abra o Amazon Virtual Private Cloud console em https://console.aws.amazon.com/vpc/.

  2. Crie uma sub-rede pública para seu NAT gateway com um nome descritivo e anote o ID da sub-rede. Para obter instruções detalhadas, consulte Criar uma sub-rede no seu VPC.

  3. Crie um gateway de internet para que você VPC possa se comunicar com a internet e anote o ID do gateway. Conecte o gateway de internet ao seuVPC. Para obter mais instruções, consulte Criar e anexar um gateway da Internet à VPC.

  4. Provisione um NAT gateway público para que os hosts em suas sub-redes privadas possam acessar sua sub-rede pública. Ao criar o NAT gateway, selecione a sub-rede pública que você criou anteriormente. Para obter instruções, consulte Criar um NAT gateway.

  5. Configure suas tabelas de rotas. Para concluir essa configuração, você deve ter duas tabelas de rotas no total. Você já deve ter uma tabela de rotas principal que foi criada automaticamente ao mesmo tempo que a suaVPC. Nesta etapa, você cria uma tabela de rotas adicional para sua sub-rede pública.

    1. Use as configurações a seguir para modificar sua tabela VPC de rotas principal para que suas sub-redes privadas roteiem o tráfego para seu NAT gateway. Para obter instruções, consulte Trabalhar com tabelas de rotas no Guia do usuário do Amazon Virtual Private Cloud.

      Tabela de MSKC rotas privadas
      Propriedade Valor
      Name tag Recomendamos que você atribua uma tag de nome descritivo a essa tabela de rotas para ajudar na identificação dela. Por exemplo, Privado MSKC.
      Sub-redes associadas Suas sub-redes privadas
      Uma rota para habilitar o acesso à Internet para o MSK Connect
      • Destino: 0.0.0.0/0

      • Alvo: Seu ID de NAT gateway. Por exemplo, nat-12a345bc6789efg1h.

      Uma rota local para o tráfego interno
      • Destino: 10.0.0.0/16 Esse valor pode variar dependendo VPC do seu CIDR bloqueio.

      • Alvo: local

    2. Siga as instruções em Criar uma tabela de rotas personalizada para criar uma tabela de rotas para sua sub-rede pública. Ao criar a tabela, insira um nome descritivo no campo Tag de nome para ajudar você a identificar a qual sub-rede a tabela está associada. Por exemplo, Público MSKC.

    3. Configure sua tabela de MSKC rotas públicas usando as seguintes configurações.

      Propriedade Valor
      Name tag Nome público MSKC ou um nome descritivo diferente que você escolher
      Sub-redes associadas Sua sub-rede pública com gateway NAT
      Uma rota para habilitar o acesso à Internet para o MSK Connect
      • Destino: 0.0.0.0/0

      • Alvo: o ID do seu gateway da Internet Por exemplo, igw-1a234bc5.

      Uma rota local para o tráfego interno
      • Destino: 10.0.0.0/16 Esse valor pode variar dependendo VPC do seu CIDR bloqueio.

      • Alvo: local

Agora que você habilitou o acesso à Internet para o Amazon MSK Connect, você está pronto para criar um conector.

Como criar um conector de origem do Debezium

  1. Criar um plug-in personalizado
    1. Baixe o plug-in My SQL connector para obter a versão estável mais recente no site do Debezium. Anote a versão do Debezium que você baixou (versão 2.x ou a antiga série 1.x). Você criará um conector com base na sua versão do Debezium mais adiante neste procedimento.

    2. Baixe e extraia o AWS Secrets Manager Config Provider.

    3. Coloque os seguintes arquivos no mesmo diretório:

      • A pasta debezium-connector-mysql.

      • A pasta jcusten-border-kafka-config-provider-aws-0.1.1.

    4. Comprima o diretório que você criou na etapa anterior em um ZIP arquivo e, em seguida, carregue o ZIP arquivo em um bucket do S3. Para obter instruções, consulte Upload de objetos no Guia do usuário do Amazon S3.

    5. Copie o seguinte JSON e cole-o em um arquivo. Por exemplo, debezium-source-custom-plugin.json. Substituir <example-custom-plugin-name> com o nome que você deseja que o plugin tenha, <arn-of-your-s3-bucket> com o ARN do bucket do S3 em que você fez o upload do ZIP arquivo e <file-key-of-ZIP-object> com a chave de arquivo do ZIP objeto que você carregou no S3.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Execute o AWS CLI comando a seguir na pasta em que você salvou o JSON arquivo para criar um plug-in.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Você deve ver uma saída semelhante ao seguinte exemplo.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Execute o comando a seguir para verificar o estado do plug-in. O estado do cluster deve mudar de CREATING para ACTIVE. Substitua o ARN espaço reservado pelo ARN que você obteve na saída do comando anterior.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Configure AWS Secrets Manager e crie um segredo para suas credenciais de banco de dados
    1. Abra o console do Secrets Manager em https://console.aws.amazon.com/secretsmanager/.

    2. Crie um novo segredo para armazenar as credenciais de login do banco de dados. Para obter instruções, consulte Criar um segredo no Guia do usuário do AWS Secrets Manager.

    3. Copie seus segredosARN.

    4. Adicione as permissões do Secrets Manager do exemplo de política a seguir ao seu Perfil de execução do serviço. Substituir <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> com o ARN do seu segredo.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Para obter instruções sobre como adicionar IAM permissões, consulte Adicionar e remover permissões de IAM identidade no Guia do IAM usuário.

  3. Criar uma configuração personalizada de operador com informações sobre seu provedor de configuração
    1. Copie as seguintes propriedades de configuração do operador em um arquivo, substituindo as strings de espaço reservado por valores que correspondam ao seu cenário. Para saber mais sobre as propriedades de configuração do AWS Secrets Manager Config Provider, consulte a SecretsManagerConfigProviderdocumentação do plug-in.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Execute o AWS CLI comando a seguir para criar sua configuração de trabalhador personalizada.

      Substitua os valores a seguir:

      • <my-worker-config-name> - um nome descritivo para sua configuração de trabalhador personalizada

      • <encoded-properties-file-content-string> - uma versão codificada em base64 das propriedades de texto simples que você copiou na etapa anterior

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Criar um conector
    1. Copie o seguinte JSON que corresponde à sua versão do Debezium (2.x ou 1.x) e cole-o em um novo arquivo. Substitua as strings <placeholder> por valores que correspondam ao seu cenário. Para obter mais informações sobre como configurar um perfil de execução de serviços, consulte Perfis e políticas do IAM para o MSK Connect.

      Para especificar as credenciais do banco de dados, a configuração usa variáveis como ${secretManager:MySecret-1234:dbusername} em vez de texto simples. Substitua MySecret-1234 pelo nome do seu segredo e inclua o nome da chave que você deseja recuperar. Você também deve <arn-of-config-provider-worker-configuration> substituir pela configuração ARN de seu trabalhador personalizado.

      Debezium 2.x

      Para versões do Debezium 2.x, copie o seguinte JSON e cole-o em um novo arquivo. Substitua o <placeholder> cadeias de caracteres com valores que correspondem ao seu cenário.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Para versões do Debezium 1.x, copie o seguinte JSON e cole-o em um novo arquivo. Substitua o <placeholder> cadeias de caracteres com valores que correspondem ao seu cenário.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Execute o AWS CLI comando a seguir na pasta em que você salvou o JSON arquivo na etapa anterior.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Veja a seguir um exemplo da saída que você vai obter ao executar o comando com êxito.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Para ver um exemplo de conector Debezium com etapas detalhadas, consulte Introdução ao Amazon MSK Connect - Transmita dados de e para seus clusters do Apache Kafka usando conectores gerenciados.