Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Workers
Un worker est un processus de machine virtuelle Java (JVM) qui exécute la logique du connecteur. Chaque worker crée un ensemble de tâches qui s'exécutent dans des threads parallèles et se chargent de copier les données. Les tâches ne stockent pas l'état et peuvent donc être démarrées, arrêtées ou redémarrées à tout moment afin de fournir un pipeline de données résilient et évolutif. Les modifications du nombre de workers, qu'elles soient dues à un événement de mise à l'échelle ou à des défaillances inattendues, sont automatiquement détectées par les autres workers. Ils se coordonnent pour rééquilibrer les tâches entre les workers restants. Les workers de Connect utilisent les groupes de consommateurs d'Apache Kafka pour coordonner et rééquilibrer leurs tâches.
Si les exigences de capacité de votre connecteur sont variables ou difficiles à estimer, vous pouvez laisser MSK Connect ajuster le nombre de workers selon vos besoins entre une limite inférieure et une limite supérieure que vous spécifiez. Vous pouvez également spécifier le nombre exact de workers que vous souhaitez exécuter sur votre logique de connecteur. Pour plus d’informations, consultez Capacité du connecteur.
Les employés de MSK Connect consomment des adresses IP
Les employés de MSK Connect consomment des adresses IP dans les sous-réseaux fournis par le client. Chaque travailleur utilise une adresse IP provenant de l'un des sous-réseaux fournis par le client. Vous devez vous assurer que vous disposez d'un nombre suffisant d'adresses IP disponibles dans les sous-réseaux fournis à une CreateConnector demande pour tenir compte de leur capacité spécifiée, en particulier lors du dimensionnement automatique des connecteurs où le nombre de travailleurs peut fluctuer.
Rubriques
Configuration du processus worker par défaut
MSK Connect fournit la configuration de worker par défaut suivante :
key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter
Propriétés de configuration de l'environnement de worker compatibles
MSK Connect fournit une configuration de worker par défaut. Vous avez également la possibilité de créer une configuration de worker personnalisée à utiliser avec vos connecteurs. La liste suivante inclut des informations sur les propriétés de configuration de worker prises en charge ou non par Amazon MSK Connect.
-
Seules les propriétés
key.converter
etvalue.converter
sont obligatoires. -
MSK Connect prend en charge les propriétés de configuration
producer.
suivantes.producer.acks producer.batch.size producer.buffer.memory producer.compression.type producer.enable.idempotence producer.key.serializer producer.max.request.size producer.metadata.max.age.ms producer.metadata.max.idle.ms producer.partitioner.class producer.reconnect.backoff.max.ms producer.reconnect.backoff.ms producer.request.timeout.ms producer.retry.backoff.ms producer.value.serializer
-
MSK Connect prend en charge les propriétés de configuration
consumer.
suivantes.consumer.allow.auto.create.topics consumer.auto.offset.reset consumer.check.crcs consumer.fetch.max.bytes consumer.fetch.max.wait.ms consumer.fetch.min.bytes consumer.heartbeat.interval.ms consumer.key.deserializer consumer.max.partition.fetch.bytes consumer.max.poll.records consumer.metadata.max.age.ms consumer.partition.assignment.strategy consumer.reconnect.backoff.max.ms consumer.reconnect.backoff.ms consumer.request.timeout.ms consumer.retry.backoff.ms consumer.session.timeout.ms consumer.value.deserializer
-
Toutes les autres propriétés de configuration qui ne commencent pas par les préfixes
producer.
ou sontconsumer.
prises en charge, sauf les propriétés suivantes.access.control. admin. admin.listeners.https. client. connect. inter.worker. internal. listeners.https. metrics. metrics.context. rest. sasl. security. socket. ssl. topic.tracking. worker. bootstrap.servers config.storage.topic connections.max.idle.ms connector.client.config.override.policy group.id listeners metric.reporters plugin.path receive.buffer.bytes response.http.headers.config scheduled.rebalance.max.delay.ms send.buffer.bytes status.storage.topic
Pour plus d'informations sur les propriétés de configuration de worker et ce qu'elles représentent, consultez Kafka Connect Configs
Création d'une configuration de worker personnalisée
Création d'une configuration de travail personnalisée à l'aide du AWS Management Console
Ouvrez la console Amazon MSK à l'adresse https://console.aws.amazon.com/msk/
. -
Dans le volet gauche, sous MSK Connect, choisissez Configurations de worker.
-
Choisissez Créer une configuration de worker.
-
Entrez un nom et une description facultative, puis ajoutez les propriétés et les valeurs que vous souhaitez leur attribuer.
-
Choisissez Créer une configuration de worker.
Pour utiliser l'API MSK Connect afin de créer une configuration de travail, consultez CreateWorkerConfiguration.
Gestion des décalages du connecteur source en utilisant offset.storage.topic
Cette section fournit des informations qui vous aideront à gérer les décalages des connecteurs source à l'aide de la rubrique Stockage des décalages. La rubrique du stockage des décalages est une rubrique interne que Kafka Connect utilise pour stocker les décalages de configuration des connecteurs et des tâches.
Utilisation de la rubrique de stockage des décalages par défaut
Par défaut, Amazon MSK Connect génère une nouvelle rubrique de stockage des décalages sur votre cluster Kafka pour chaque connecteur que vous créez. MSK construit le nom de rubrique par défaut en utilisant des parties de l'ARN du connecteur. Par exemple, __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
.
Spécification de votre propre rubrique de stockage des décalages
Pour assurer la continuité des décalages entre les connecteurs source, vous pouvez utiliser une rubrique de stockage des décalages de votre choix au lieu de la rubrique par défaut. La spécification d'une rubrique de stockage des décalages vous aide à accomplir des tâches telles que la création d'un connecteur source qui reprend la lecture à partir du dernier décalage d'un connecteur précédent.
Pour spécifier une rubrique de stockage des décalages, vous devez fournir une valeur pour la propriété offset.storage.topic
dans votre configuration de worker avant de créer un connecteur. Si vous souhaitez réutiliser la rubrique de stockage des décalages pour utiliser les décalages d'un connecteur créé précédemment, vous devez donner au nouveau connecteur le même nom que l'ancien connecteur. Si vous créez une rubrique de stockage de décalages personnalisée, vous devez définir cleanup.policy
compact
dans la configuration de votre rubrique.
Note
Si vous spécifiez une rubrique de stockage des décalages lorsque vous créez un connecteur récepteur, MSK Connect crée la rubrique si elle n'existe pas déjà. Toutefois, cette rubrique ne sera pas utilisée pour enregistrer les décalages du connecteur.
Les décalages du connecteur récepteur sont plutôt gérés à l'aide du protocole de groupe de consommateurs Kafka. Chaque connecteur récepteur crée un groupe nommé connect-{CONNECTOR_NAME}
. Tant que le groupe de consommateurs existe, tous les connecteurs récepteurs successifs que vous créez avec la même valeur CONNECTOR_NAME
seront maintenus à partir du dernier décalage validé.
Exemple Spécification d'une rubrique de stockage des décalages pour recréer un connecteur source avec une configuration mise à jour
Supposons que vous disposiez d'un connecteur CDC (Change Data Capture) et que vous souhaitiez modifier la configuration du connecteur sans perdre votre place dans le flux CDC. Vous ne pouvez pas mettre à jour la configuration de connecteur existante, mais vous pouvez supprimer le connecteur et en créer un autre portant le même nom. Pour indiquer au nouveau connecteur par où commencer à lire dans le flux CDC, vous pouvez spécifier la rubrique de stockage des décalages de l'ancien connecteur dans votre configuration de worker. Les étapes suivantes expliquent comment effectuer cette tâche.
-
Sur votre machine cliente, exécutez la commande suivante pour trouver le nom de la rubrique de stockage des décalages de votre connecteur. Remplacez
par la chaîne de l'agent d'amorçage de votre cluster. Pour obtenir des instructions sur l'obtention de votre chaîne de l'agent d'amorçage, consultez Obtention des agents d'amorçage pour un cluster Amazon MSK.<bootstrapBrokerString>
<path-to-your-kafka-installation>
/bin/kafka-topics.sh --list --bootstrap-server<bootstrapBrokerString>
La sortie suivante présente une liste de toutes les rubriques du cluster, y compris les rubriques du connecteur interne par défaut. Dans cet exemple, le connecteur CDC existant utilise la rubrique de stockage des décalages par défaut créée par MSK Connect. C'est pourquoi la rubrique de stockage des décalages est appelée
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
.__consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
-
Ouvrez la console Amazon MSK sur https://console.aws.amazon.com/msk/
. -
Choisissez votre connecteur dans la liste des connecteurs. Copiez et enregistrez le contenu du champ de configuration du connecteur afin de pouvoir le modifier et l'utiliser pour créer le nouveau connecteur.
-
Pour supprimer la connecteur, choisissez Supprimer. Puis saisissez le nom du connecteur dans le champ d'entrée du texte pour confirmer la suppression.
-
Créez une configuration de worker personnalisée avec des valeurs adaptées à votre scénario. Pour obtenir des instructions, veuillez consulter Création d'une configuration de worker personnalisée.
Dans votre configuration de worker, vous devez spécifier le nom de la rubrique de stockage des décalages que vous avez précédemment récupérée en tant que valeur pour
offset.storage.topic
comme dans la configuration suivante.config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=
__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
-
Important
Vous devez donner à votre nouveau connecteur le même nom que l'ancien connecteur.
Créez un nouveau connecteur à l'aide de la configuration de worker que vous avez configurée à l'étape précédente. Pour obtenir des instructions, veuillez consulter Création d'un connecteur.
Considérations
Tenez compte des éléments suivants lorsque vous gérez les décalages du connecteur source.
-
Pour spécifier une rubrique de stockage des décalages, indiquez le nom de la rubrique Kafka dans lequel les décalages des connecteurs sont stockés en tant que valeur pour
offset.storage.topic
dans votre configuration de worker. -
Soyez prudent lorsque vous modifiez la configuration d'un connecteur. La modification des valeurs de configuration peut entraîner un comportement involontaire du connecteur si un connecteur source utilise les valeurs de la configuration pour saisir des enregistrements de décalage. Pour plus d'informations, nous vous recommandons de consulter la documentation de votre plugin.
-
Personnaliser le nombre de partitions par défaut : en plus de personnaliser la configuration de worker en ajoutant
offset.storage.topic
, vous pouvez personnaliser le nombre de partitions pour les rubriques de stockage des décalages et des statuts. Les partitions par défaut pour les rubriques internes sont les suivantes.-
config.storage.topic
: 1, non configurable, doit être une rubrique à partition unique -
offset.storage.topic
: 25, configurable en fournissantoffset.storage.partitions
-
status.storage.topic
: 5, configurable en fournissantstatus.storage.partitions
-
-
Suppression manuelle des rubriques : Amazon MSK Connect crée de nouvelles rubriques internes à Kafka Connect (le nom de la rubrique commence par
__amazon_msk_connect
) à chaque déploiement de connecteurs. Les anciennes rubriques attachées à des connecteurs supprimés ne sont pas automatiquement supprimées car les rubriques internes, telles queoffset.storage.topic
, peuvent être réutilisées entre les connecteurs. Cependant, vous pouvez supprimer manuellement les rubriques internes non utilisées créées par MSK Connect. Les rubriques internes sont nommées selon le format__amazon_msk_connect_<offsets|status|configs>_
.connector_name
_connector_id
L'expression régulière
__amazon_msk_connect_<offsets|status|configs>_
peut être utilisée pour supprimer les rubriques internes. Vous ne devez pas supprimer une rubrique interne actuellement utilisée par un connecteur en cours d'exécution.connector_name
_connector_id
-
Utilisation du même nom pour les rubriques internes créées par MSK Connect : si vous souhaitez réutiliser la rubrique de stockage des décalages pour utiliser les décalages d'un connecteur créé précédemment, vous devez donner au nouveau connecteur le même nom que l'ancien connecteur. La propriété
offset.storage.topic
peut être définie à l'aide de la configuration de worker pour attribuer le même nom àoffset.storage.topic
et réutilisée entre différents connecteurs. Cette configuration est décrite dans Gestion des décalages de connecteurs. MSK Connect n'autorise pas que les différents connecteurs partagentconfig.storage.topic
etstatus.storage.topic
. Ces rubriques sont créées chaque fois que vous créez un nouveau connecteur dans MSKC. Ils sont automatiquement nommés selon le format__amazon_msk_connect_<status|configs>_
et sont donc différents selon les connecteurs que vous créez.connector_name
_connector_id