Connettore di origine Debezium con provider di configurazione - Amazon Managed Streaming per Apache Kafka

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Connettore di origine Debezium con provider di configurazione

Questo esempio mostra come utilizzare il plug-in del connettore Debezium MySQL con un database Amazon Aurora compatibile con MySQL come origine. In questo esempio, abbiamo anche configurato il provider open source AWS Secrets Manager Config Provider per esternalizzare le credenziali del database in AWS Secrets Manager. Per ulteriori informazioni sui provider di configurazione, consulta la pagina Esternalizzazione di informazioni sensibili utilizzando i provider di configurazione.

Importante

Il plug-in del connettore Debezium MySQL supporta solo un'attività e non funziona con la modalità di capacità con dimensionamento automatico per Amazon MSK Connect. Dovresti invece utilizzare la modalità di capacità assegnata e impostare il valore workerCount su uno una nella configurazione del connettore. Per ulteriori informazioni sulle modalità di capacità di MSK Connect, consulta la pagina Capacità del connettore.

Prima di iniziare

Il connettore deve essere in grado di accedere a Internet in modo da poter interagire con servizi esterni, ad esempio AWS Secrets Manager, esterni al tuo Amazon Virtual Private Cloud. I passaggi descritti in questa sezione consentono di completare le seguenti attività per abilitare l'accesso a Internet.

  • Configura una sottorete pubblica che ospita un gateway NAT e indirizza il traffico verso un gateway Internet nel tuo VPC.

  • Crea una route predefinita che indirizza il traffico della sottorete privata verso il gateway NAT.

Per ulteriori informazioni, consulta la pagina Abilitazione dell'accesso a Internet per Amazon MSK Connect.

Prerequisiti

Prima di abilitare l'accesso a Internet, devi disporre dei seguenti elementi:

  • L'ID del Amazon Virtual Private Cloud (VPC) associato al cluster. Ad esempio, vpc-123456ab.

  • Gli ID delle sottoreti private nel VPC. Ad esempio, subnet-a1b2c3de, subnet-f4g5h6ij e così via. Il connettore deve essere configurato con sottoreti private.

Abilitazione dell'accesso a Internet per il connettore
  1. Aprire la console Amazon Virtual Private Cloud all'indirizzo https://console.aws.amazon.com/vpc/.

  2. Crea una sottorete pubblica con un nome descrittivo per il gateway NAT e prendi nota dell'ID della sottorete. Per istruzioni dettagliate, consulta la pagina Create a subnet in your VPC.

  3. Crea un gateway Internet in modo che il VPC possa comunicare con Internet e prendi nota dell'ID del gateway. Collega il gateway Internet al VPC. Per istruzioni, consulta la pagina Create and attach an internet gateway.

  4. Fornisci un gateway NAT pubblico in modo che gli host delle tue sottoreti private possano raggiungere la tua sottorete pubblica. Quando crei il gateway NAT, seleziona la sottorete pubblica creata in precedenza. Per istruzioni, consulta Creazione di un gateway NAT.

  5. Configura le tabelle di routing. Per completare questa configurazione, occorrono in totale due tabelle di routing. Dovresti già disporre di una tabella di routing principale creata in automatico al momento della creazione del VPC. In questo passaggio creerai una tabella di routing aggiuntiva per la sottorete pubblica.

    1. Utilizza le seguenti impostazioni per modificare la tabella di routing principale del tuo VPC in modo che le sottoreti private instradino il traffico verso il tuo gateway NAT. Per le istruzioni, consulta la pagina Work with route tables nella Guida per l'utente di Amazon Virtual Private Cloud.

      Tabella di routing MSKC privata
      Proprietà Valore
      Name tag (Tag nome) Ti consigliamo di assegnare a questa tabella di routing un nome descrittivo per facilitarne l'identificazione. Ad esempio, MSKC privata.
      Sottoreti associate Le tue sottoreti private
      Un percorso per consentire l'accesso a Internet per MSK Connect
      • Destinazione: 0.0.0.0/0

      • Obiettivo: l'ID del gateway NAT. Ad esempio, nat-12a345bc6789efg1h.

      Un percorso per tutto il traffico locale
      • Destinazione: 10.0.0.0/16. Questo valore può variare a seconda del blocco CIDR del tuo VPC.

      • Obiettivo: locale

    2. Segui le istruzioni riportate alla pagina Create a custom route table per creare una tabella di routing per la sottorete pubblica. Quando crei la tabella, inserisci un nome descrittivo nel campo Tag nome per aiutarti a identificare a quale sottorete è associata la tabella. Ad esempio, MSKC pubblica.

    3. Configura la tua tabella di routing MSKC pubblica utilizzando le seguenti impostazioni.

      Proprietà Valore
      Name tag (Tag nome) MSKC pubblica o un altro nome descrittivo a scelta
      Sottoreti associate La tua sottorete pubblica con gateway NAT
      Un percorso per consentire l'accesso a Internet per MSK Connect
      • Destinazione: 0.0.0.0/0

      • Obiettivo: l'ID del gateway Internet. Ad esempio, igw-1a234bc5.

      Un percorso per tutto il traffico locale
      • Destinazione: 10.0.0.0/16. Questo valore può variare a seconda del blocco CIDR del tuo VPC.

      • Obiettivo: locale

Ora che hai abilitato l'accesso a Internet per Amazon MSK Connect, puoi creare un connettore.

Creazione di un connettore di origine Debezium

  1. Creazione di un plug-in personalizzato
    1. Scarica il plug-in del connettore MySQL per l'ultima versione stabile dal sito Debezium. Prendi nota della versione di rilascio di Debezium che scarichi (versione 2.x o la vecchia serie 1.x). Più avanti in questa procedura, creerai un connettore basato sulla tua versione di Debezium.

    2. Scarica ed estrai AWS Secrets Manager Config Provider.

    3. Colloca i seguenti archivi nella stessa directory:

      • La cartella debezium-connector-mysql

      • La cartella jcusten-border-kafka-config-provider-aws-0.1.1

    4. Comprimi la directory che hai creato nel passaggio precedente in un file ZIP, quindi carica il file ZIP in un bucket S3. Per istruzioni, consulta la pagina Uploading objects in Amazon S3 nella Guida per l'utente di Amazon S3.

    5. Copia il codice JSON seguente e incollalo in un file. Ad esempio, debezium-source-custom-plugin.json. Sostituisci <example-custom-plugin-name> con il nome che desideri assegnare al plug-in, <arn-of-your-s3-bucket> con l'ARN del bucket S3 in cui hai caricato il file ZIP e <file-key-of-ZIP-object> con la chiave del file dell'oggetto ZIP che hai caricato su S3.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Esegui il comando AWS CLI dalla cartella in cui hai salvato il file JSON per creare un plug-in.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Verrà visualizzato un output simile al seguente.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Esegui il comando seguente per verificare lo stato del plug-in. Lo stato del cluster dovrebbe passare da CREATING a ACTIVE. Sostituisci il segnaposto ARN con l'ARN ottenuto nell'output del comando precedente.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Configurazione di AWS Secrets Manager e creazione di un segreto per le credenziali del tuo database
    1. Apri la console di Secrets Manager all'indirizzo https://console.aws.amazon.com/secretsmanager/.

    2. Crea un nuovo segreto per archiviare le credenziali di accesso al database. Per le istruzioni, consulta la pagina Create a secret nella Guida per l'utente di AWS Secrets Manager.

    3. Copia l'ARN del segreto.

    4. Aggiungi le autorizzazioni di Secrets Manager dalla seguente policy di esempio al tuo Ruolo di esecuzione del servizio. Sostituisci <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> con l'ARN del segreto.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Per istruzioni sull'aggiunta di autorizzazioni IAM, consulta la pagina Adding and removing IAM identity permissions nella Guida per l'utente di IAM.

  3. Creazione di una configurazione del worker personalizzata con informazioni sul proprio provider di configurazione
    1. Copia le seguenti proprietà di configurazione del worker in un file, sostituendo le stringhe segnaposto con valori che corrispondono al tuo scenario. Per ulteriori informazioni sulle proprietà di configurazione per il provider di configurazione di AWS Secrets Manager, consulta la sezione SecretsManagerConfigProvider nella documentazione del plug-in.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Esegui il comando AWS CLI per creare la configurazione del worker personalizzata.

      Sostituisci i valori seguenti:

      • <my-worker-config-name>: un nome descrittivo per la configurazione del worker personalizzata

      • <encoded-properties-file-content-string>: una versione con codifica base64 delle proprietà di testo non crittografate copiate nel passaggio precedente

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Creazione di un connettore
    1. Copia il codice JSON seguente, che corrisponde alla tua versione di Debezium (2.x o 1.x), e incollalo in un nuovo file. Sostituisci le stringhe <placeholder> con valori che corrispondono al tuo scenario. Per informazioni su come configurare un ruolo di esecuzione del servizio, consulta la pagina Ruoli e policy IAM per MSK Connect.

      Nota che la configurazione utilizza variabili come ${secretManager:MySecret-1234:dbusername} anziché testo non crittografato per specificare le credenziali del database. Sostituisci MySecret-1234 con il nome del tuo segreto, poi includi il nome della chiave che desideri recuperare. È inoltre necessario sostituire <arn-of-config-provider-worker-configuration> con l'ARN della configurazione del worker personalizzata.

      Debezium 2.x

      Per le versioni di Debezium 2.x, copia il codice JSON seguente e incollalo in un nuovo file. Sostituisci le stringhe <placeholder> con valori che corrispondono al tuo scenario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Per le versioni di Debezium 1.x, copia il codice JSON seguente e incollalo in un nuovo file. Sostituisci le stringhe <placeholder> con valori che corrispondono al tuo scenario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Esegui il comando AWS CLI nella cartella di salvataggio del file JSON del passaggio precedente.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Di seguito è riportato un esempio dell'output che si ottiene eseguendo correttamente il comando.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Per un esempio di connettore Debezium con i passaggi dettagliati, consulta la pagina Introducing Amazon MSK Connect - Stream Data to and from Your Apache Kafka Clusters Using Managed Connectors.