Connettore di origine Debezium con provider di configurazione - Amazon Managed Streaming per Apache Kafka

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Connettore di origine Debezium con provider di configurazione

Questo esempio mostra come usare il plugin Debezium My SQL connector con un database SQL Amazon Aurora compatibile con My come sorgente. In questo esempio, abbiamo anche configurato il provider open source AWS Secrets Manager Config Provider per esternalizzare le credenziali del database in AWS Secrets Manager. Per ulteriori informazioni sui provider di configurazione, consulta la pagina Esternalizzazione di informazioni sensibili utilizzando i provider di configurazione.

Importante

Il plugin Debezium My SQL connector supporta solo un'operazione e non funziona con la modalità di capacità scalata automatica per Amazon Connect. MSK Dovresti invece utilizzare la modalità di capacità assegnata e impostare il valore workerCount su uno una nella configurazione del connettore. Per ulteriori informazioni sulle modalità di capacità di MSK Connect, vedereCapacità del connettore.

Prima di iniziare

Il connettore deve essere in grado di accedere a Internet in modo da poter interagire con servizi esterni al proprio Amazon Virtual Private Cloud. AWS Secrets Manager I passaggi descritti in questa sezione consentono di completare le seguenti attività per abilitare l'accesso a Internet.

  • Configura una sottorete pubblica che ospita un NAT gateway e indirizza il traffico verso un gateway Internet del tuoVPC.

  • Crea un percorso predefinito che indirizza il traffico della sottorete privata verso il gateway. NAT

Per ulteriori informazioni, consulta Abilitazione dell'accesso a Internet per Amazon MSK Connect.

Prerequisiti

Prima di abilitare l'accesso a Internet, devi disporre dei seguenti elementi:

  • L'ID del Amazon Virtual Private Cloud (VPC) associato al cluster. Ad esempio, vpc-123456ab.

  • Le IDs sottoreti private del tuo. VPC Ad esempio, subnet-a1b2c3de, subnet-f4g5h6ij e così via. Il connettore deve essere configurato con sottoreti private.

Abilitazione dell'accesso a Internet per il connettore
  1. Apri la Amazon Virtual Private Cloud console all'indirizzo. https://console.aws.amazon.com/vpc/

  2. Crea una sottorete pubblica per il NAT gateway con un nome descrittivo e annota l'ID della sottorete. Per istruzioni dettagliate, consulta Creare una sottorete in. VPC

  3. Crea un gateway Internet in modo da VPC poter comunicare con Internet e annota l'ID del gateway. Collega il gateway Internet al tuoVPC. Per istruzioni, consulta la pagina Create and attach an internet gateway.

  4. Fornisci un NAT gateway pubblico in modo che gli host delle tue sottoreti private possano raggiungere la tua sottorete pubblica. Quando crei il NAT gateway, seleziona la sottorete pubblica creata in precedenza. Per istruzioni, consulta Creare un NAT gateway.

  5. Configura le tabelle di routing. Per completare questa configurazione, occorrono in totale due tabelle di routing. Dovresti già avere una tabella di routing principale creata automaticamente contemporaneamente alla tuaVPC. In questo passaggio creerai una tabella di routing aggiuntiva per la sottorete pubblica.

    1. Utilizza le seguenti impostazioni per modificare la VPC tabella delle rotte principali in modo che le sottoreti private indirizzino il traffico verso il NAT gateway. Per le istruzioni, consulta la pagina Utilizzo delle tabelle di routing nella Guida per l'utente di Amazon Virtual Private Cloud.

      Tabella dei percorsi privati MSKC
      Proprietà Valore
      Name tag (Tag nome) Ti consigliamo di assegnare a questa tabella di routing un nome descrittivo per facilitarne l'identificazione. Ad esempio, Private MSKC.
      Sottoreti associate Le tue sottoreti private
      Un percorso per abilitare l'accesso a Internet per MSK Connect
      • Destinazione: 0.0.0.0/0

      • Obiettivo: il tuo ID NAT gateway. Ad esempio, nat-12a345bc6789efg1h.

      Un percorso per tutto il traffico locale
      • Destinazione: 10.0.0.0/16. Questo valore può variare a seconda VPC del tuo CIDR blocco.

      • Obiettivo: locale

    2. Segui le istruzioni riportate nella pagina Creazione di una tabella di routing personalizzata per creare una tabella di routing per la sottorete pubblica. Quando crei la tabella, inserisci un nome descrittivo nel campo Tag nome per identificare a quale sottorete è associata la tabella. Ad esempio, Public MSKC.

    3. Configura la tua tabella di MSKC routing pubblica utilizzando le seguenti impostazioni.

      Proprietà Valore
      Name tag (Tag nome) Nome pubblico MSKC o descrittivo diverso a tua scelta
      Sottoreti associate La tua sottorete pubblica con gateway NAT
      Un percorso per abilitare l'accesso a Internet per MSK Connect
      • Destinazione: 0.0.0.0/0

      • Obiettivo: l'ID del gateway Internet. Ad esempio, igw-1a234bc5.

      Un percorso per tutto il traffico locale
      • Destinazione: 10.0.0.0/16. Questo valore può variare a seconda VPC del tuo CIDR blocco.

      • Obiettivo: locale

Ora che hai abilitato l'accesso a Internet per Amazon MSK Connect, sei pronto per creare un connettore.

Creazione di un connettore di origine Debezium

  1. Creazione di un plug-in personalizzato
    1. Scarica il plugin My SQL connector per l'ultima versione stabile dal sito Debezium. Prendi nota della versione di rilascio di Debezium che scarichi (versione 2.x o la vecchia serie 1.x). Più avanti in questa procedura, creerai un connettore basato sulla tua versione di Debezium.

    2. Scarica ed estrai AWS Secrets Manager Config Provider.

    3. Colloca i seguenti archivi nella stessa directory:

      • La cartella debezium-connector-mysql

      • La cartella jcusten-border-kafka-config-provider-aws-0.1.1

    4. Comprimi la directory che hai creato nel passaggio precedente in un ZIP file e poi carica il ZIP file in un bucket S3. Per istruzioni, consulta la pagina Uploading objects in Amazon S3 nella Guida per l'utente di Amazon S3.

    5. Copia quanto segue JSON e incollalo in un file. Ad esempio, debezium-source-custom-plugin.json. Replace (Sostituisci) <example-custom-plugin-name> con il nome che vuoi che abbia il plugin, <arn-of-your-s3-bucket> con il ARN bucket S3 in cui hai caricato il ZIP file e <file-key-of-ZIP-object> con la chiave del file dell'ZIPoggetto che hai caricato su S3.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. Esegui il seguente AWS CLI comando dalla cartella in cui hai salvato il JSON file per creare un plugin.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      Verrà visualizzato un output simile al seguente.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. Esegui il comando seguente per verificare lo stato del plug-in. Lo stato del cluster dovrebbe passare da CREATING a ACTIVE. Sostituisci il ARN segnaposto con ARN quello che hai ottenuto nell'output del comando precedente.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. Configura AWS Secrets Manager e crea un segreto per le credenziali del tuo database
    1. Apri la console Secrets Manager all'indirizzo https://console.aws.amazon.com/secretsmanager/.

    2. Crea un nuovo segreto per archiviare le credenziali di accesso al database. Per le istruzioni, consulta la pagina Create a secret nella Guida per l'utente di AWS Secrets Manager.

    3. Copia i tuoi segretiARN.

    4. Aggiungi le autorizzazioni di Secrets Manager dalla seguente policy di esempio al tuo Ruolo di esecuzione del servizio. Replace (Sostituisci) <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234> con ARN il tuo segreto.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      Per istruzioni su come aggiungere IAM autorizzazioni, consulta Aggiungere e rimuovere le autorizzazioni di IAM identità nella Guida per l'IAMutente.

  3. Creazione di una configurazione del worker personalizzata con informazioni sul proprio provider di configurazione
    1. Copia le seguenti proprietà di configurazione del worker in un file, sostituendo le stringhe segnaposto con valori che corrispondono al tuo scenario. Per ulteriori informazioni sulle proprietà di configurazione per il provider di configurazione di AWS Secrets Manager Config, SecretsManagerConfigProviderconsultate la documentazione del plugin.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. Esegui il AWS CLI comando seguente per creare la tua configurazione di lavoro personalizzata.

      Sostituisci i valori seguenti:

      • <my-worker-config-name> - un nome descrittivo per la configurazione personalizzata del lavoratore

      • <encoded-properties-file-content-string> - una versione con codifica base64 delle proprietà di testo in chiaro copiate nel passaggio precedente

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. Creazione di un connettore
    1. Copia quanto segue JSON che corrisponde alla tua versione di Debezium (2.x o 1.x) e incollalo in un nuovo file. Sostituisci le stringhe <placeholder> con valori che corrispondono al tuo scenario. Per informazioni su come configurare un ruolo di esecuzione del servizio, consulta la pagina Ruoli e policy IAM per MSK Connect.

      Nota che la configurazione utilizza variabili come ${secretManager:MySecret-1234:dbusername} anziché testo non crittografato per specificare le credenziali del database. Sostituisci MySecret-1234 con il nome del tuo segreto, poi includi il nome della chiave che desideri recuperare. È inoltre necessario sostituirlo <arn-of-config-provider-worker-configuration> con la configurazione personalizzata ARN del lavoratore.

      Debezium 2.x

      Per le versioni di Debezium 2.x, copia quanto segue JSON e incollalo in un nuovo file. Sostituisci il <placeholder> stringhe con valori che corrispondono al tuo scenario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Per le versioni di Debezium 1.x, copia quanto segue JSON e incollalo in un nuovo file. Sostituisci il <placeholder> stringhe con valori che corrispondono al tuo scenario.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. Esegui il AWS CLI comando seguente nella cartella in cui hai salvato il JSON file nel passaggio precedente.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      Di seguito è riportato un esempio dell'output che si ottiene eseguendo correttamente il comando.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }

Per un esempio di connettore Debezium con passaggi dettagliati, consulta Introduzione ad Amazon MSK Connect - Stream di dati da e verso i cluster Apache Kafka utilizzando connettori gestiti.