Workers - Amazon Managed Streaming for Apache Kafka

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Workers

Un worker est un processus de machine virtuelle Java (JVM) qui exécute la logique du connecteur. Chaque worker crée un ensemble de tâches qui s'exécutent dans des threads parallèles et se chargent de copier les données. Les tâches ne stockent pas l'état et peuvent donc être démarrées, arrêtées ou redémarrées à tout moment afin de fournir un pipeline de données résilient et évolutif. Les modifications du nombre de workers, qu'elles soient dues à un événement de mise à l'échelle ou à des défaillances inattendues, sont automatiquement détectées par les autres workers. Ils se coordonnent pour rééquilibrer les tâches entre les workers restants. Les workers de Connect utilisent les groupes de consommateurs d'Apache Kafka pour coordonner et rééquilibrer leurs tâches.

Si les exigences de capacité de votre connecteur sont variables ou difficiles à estimer, vous pouvez laisser MSK Connect ajuster le nombre de workers selon vos besoins entre une limite inférieure et une limite supérieure que vous spécifiez. Vous pouvez également spécifier le nombre exact de workers que vous souhaitez exécuter sur votre logique de connecteur. Pour plus d’informations, consultez Capacité du connecteur.

Les employés de MSK Connect consomment des adresses IP

Les employés de MSK Connect consomment des adresses IP dans les sous-réseaux fournis par le client. Chaque travailleur utilise une adresse IP provenant de l'un des sous-réseaux fournis par le client. Vous devez vous assurer que vous disposez d'un nombre suffisant d'adresses IP disponibles dans les sous-réseaux fournis à une CreateConnector demande pour tenir compte de leur capacité spécifiée, en particulier lors du dimensionnement automatique des connecteurs où le nombre de travailleurs peut fluctuer.

Configuration du processus worker par défaut

MSK Connect fournit la configuration de worker par défaut suivante :

key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter

Propriétés de configuration de l'environnement de worker compatibles

MSK Connect fournit une configuration de worker par défaut. Vous avez également la possibilité de créer une configuration de worker personnalisée à utiliser avec vos connecteurs. La liste suivante inclut des informations sur les propriétés de configuration de worker prises en charge ou non par Amazon MSK Connect.

  • Seules les propriétés key.converter et value.converter sont obligatoires.

  • MSK Connect prend en charge les propriétés de configuration producer. suivantes.

    producer.acks producer.batch.size producer.buffer.memory producer.compression.type producer.enable.idempotence producer.key.serializer producer.max.request.size producer.metadata.max.age.ms producer.metadata.max.idle.ms producer.partitioner.class producer.reconnect.backoff.max.ms producer.reconnect.backoff.ms producer.request.timeout.ms producer.retry.backoff.ms producer.value.serializer
  • MSK Connect prend en charge les propriétés de configuration consumer. suivantes.

    consumer.allow.auto.create.topics consumer.auto.offset.reset consumer.check.crcs consumer.fetch.max.bytes consumer.fetch.max.wait.ms consumer.fetch.min.bytes consumer.heartbeat.interval.ms consumer.key.deserializer consumer.max.partition.fetch.bytes consumer.max.poll.records consumer.metadata.max.age.ms consumer.partition.assignment.strategy consumer.reconnect.backoff.max.ms consumer.reconnect.backoff.ms consumer.request.timeout.ms consumer.retry.backoff.ms consumer.session.timeout.ms consumer.value.deserializer
  • Toutes les autres propriétés de configuration qui ne commencent pas par les préfixes producer. ou sont consumer. prises en charge, sauf les propriétés suivantes.

    access.control. admin. admin.listeners.https. client. connect. inter.worker. internal. listeners.https. metrics. metrics.context. rest. sasl. security. socket. ssl. topic.tracking. worker. bootstrap.servers config.storage.topic connections.max.idle.ms connector.client.config.override.policy group.id listeners metric.reporters plugin.path receive.buffer.bytes response.http.headers.config scheduled.rebalance.max.delay.ms send.buffer.bytes status.storage.topic

Pour plus d'informations sur les propriétés de configuration de worker et ce qu'elles représentent, consultez Kafka Connect Configs dans la documentation Apache Kafka.

Création d'une configuration de worker personnalisée

Création d'une configuration de travail personnalisée à l'aide du AWS Management Console
  1. Ouvrez la console Amazon MSK à l'adresse https://console.aws.amazon.com/msk/.

  2. Dans le volet gauche, sous MSK Connect, choisissez Configurations de worker.

  3. Choisissez Créer une configuration de worker.

  4. Entrez un nom et une description facultative, puis ajoutez les propriétés et les valeurs que vous souhaitez leur attribuer.

  5. Choisissez Créer une configuration de worker.

Pour utiliser l'API MSK Connect afin de créer une configuration de travail, consultez CreateWorkerConfiguration.

Gestion des décalages du connecteur source en utilisant offset.storage.topic

Cette section fournit des informations qui vous aideront à gérer les décalages des connecteurs source à l'aide de la rubrique Stockage des décalages. La rubrique du stockage des décalages est une rubrique interne que Kafka Connect utilise pour stocker les décalages de configuration des connecteurs et des tâches.

Utilisation de la rubrique de stockage des décalages par défaut

Par défaut, Amazon MSK Connect génère une nouvelle rubrique de stockage des décalages sur votre cluster Kafka pour chaque connecteur que vous créez. MSK construit le nom de rubrique par défaut en utilisant des parties de l'ARN du connecteur. Par exemple, __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

Spécification de votre propre rubrique de stockage des décalages

Pour assurer la continuité des décalages entre les connecteurs source, vous pouvez utiliser une rubrique de stockage des décalages de votre choix au lieu de la rubrique par défaut. La spécification d'une rubrique de stockage des décalages vous aide à accomplir des tâches telles que la création d'un connecteur source qui reprend la lecture à partir du dernier décalage d'un connecteur précédent.

Pour spécifier une rubrique de stockage des décalages, vous devez fournir une valeur pour la propriété offset.storage.topic dans votre configuration de worker avant de créer un connecteur. Si vous souhaitez réutiliser la rubrique de stockage des décalages pour utiliser les décalages d'un connecteur créé précédemment, vous devez donner au nouveau connecteur le même nom que l'ancien connecteur. Si vous créez une rubrique de stockage de décalages personnalisée, vous devez définir cleanup.policy sur compact dans la configuration de votre rubrique.

Note

Si vous spécifiez une rubrique de stockage des décalages lorsque vous créez un connecteur récepteur, MSK Connect crée la rubrique si elle n'existe pas déjà. Toutefois, cette rubrique ne sera pas utilisée pour enregistrer les décalages du connecteur.

Les décalages du connecteur récepteur sont plutôt gérés à l'aide du protocole de groupe de consommateurs Kafka. Chaque connecteur récepteur crée un groupe nommé connect-{CONNECTOR_NAME}. Tant que le groupe de consommateurs existe, tous les connecteurs récepteurs successifs que vous créez avec la même valeur CONNECTOR_NAME seront maintenus à partir du dernier décalage validé.

Exemple Spécification d'une rubrique de stockage des décalages pour recréer un connecteur source avec une configuration mise à jour

Supposons que vous disposiez d'un connecteur CDC (Change Data Capture) et que vous souhaitiez modifier la configuration du connecteur sans perdre votre place dans le flux CDC. Vous ne pouvez pas mettre à jour la configuration de connecteur existante, mais vous pouvez supprimer le connecteur et en créer un autre portant le même nom. Pour indiquer au nouveau connecteur par où commencer à lire dans le flux CDC, vous pouvez spécifier la rubrique de stockage des décalages de l'ancien connecteur dans votre configuration de worker. Les étapes suivantes expliquent comment effectuer cette tâche.

  1. Sur votre machine cliente, exécutez la commande suivante pour trouver le nom de la rubrique de stockage des décalages de votre connecteur. Remplacez <bootstrapBrokerString> par la chaîne de l'agent d'amorçage de votre cluster. Pour obtenir des instructions sur l'obtention de votre chaîne de l'agent d'amorçage, consultez Obtention des agents d'amorçage pour un cluster Amazon MSK.

    <path-to-your-kafka-installation>/bin/kafka-topics.sh --list --bootstrap-server <bootstrapBrokerString>

    La sortie suivante présente une liste de toutes les rubriques du cluster, y compris les rubriques du connecteur interne par défaut. Dans cet exemple, le connecteur CDC existant utilise la rubrique de stockage des décalages par défaut créée par MSK Connect. C'est pourquoi la rubrique de stockage des décalages est appelée __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2.

    __consumer_offsets __amazon_msk_canary __amazon_msk_connect_configs_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 __amazon_msk_connect_status_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2 my-msk-topic-1 my-msk-topic-2
  2. Ouvrez la console Amazon MSK sur https://console.aws.amazon.com/msk/.

  3. Choisissez votre connecteur dans la liste des connecteurs. Copiez et enregistrez le contenu du champ de configuration du connecteur afin de pouvoir le modifier et l'utiliser pour créer le nouveau connecteur.

  4. Pour supprimer la connecteur, choisissez Supprimer. Puis saisissez le nom du connecteur dans le champ d'entrée du texte pour confirmer la suppression.

  5. Créez une configuration de worker personnalisée avec des valeurs adaptées à votre scénario. Pour obtenir des instructions, veuillez consulter Création d'une configuration de worker personnalisée.

    Dans votre configuration de worker, vous devez spécifier le nom de la rubrique de stockage des décalages que vous avez précédemment récupérée en tant que valeur pour offset.storage.topic comme dans la configuration suivante.

    config.providers.secretManager.param.aws.region=us-east-1 key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager offset.storage.topic=__amazon_msk_connect_offsets_my-mskc-connector_12345678-09e7-4abc-8be8-c657f7e4ff32-2
  6. Important

    Vous devez donner à votre nouveau connecteur le même nom que l'ancien connecteur.

    Créez un nouveau connecteur à l'aide de la configuration de worker que vous avez configurée à l'étape précédente. Pour obtenir des instructions, veuillez consulter Création d'un connecteur.

Considérations

Tenez compte des éléments suivants lorsque vous gérez les décalages du connecteur source.

  • Pour spécifier une rubrique de stockage des décalages, indiquez le nom de la rubrique Kafka dans lequel les décalages des connecteurs sont stockés en tant que valeur pour offset.storage.topic dans votre configuration de worker.

  • Soyez prudent lorsque vous modifiez la configuration d'un connecteur. La modification des valeurs de configuration peut entraîner un comportement involontaire du connecteur si un connecteur source utilise les valeurs de la configuration pour saisir des enregistrements de décalage. Pour plus d'informations, nous vous recommandons de consulter la documentation de votre plugin.

  • Personnaliser le nombre de partitions par défaut : en plus de personnaliser la configuration de worker en ajoutant offset.storage.topic, vous pouvez personnaliser le nombre de partitions pour les rubriques de stockage des décalages et des statuts. Les partitions par défaut pour les rubriques internes sont les suivantes.

    • config.storage.topic : 1, non configurable, doit être une rubrique à partition unique

    • offset.storage.topic : 25, configurable en fournissant offset.storage.partitions

    • status.storage.topic : 5, configurable en fournissant status.storage.partitions

  • Suppression manuelle des rubriques : Amazon MSK Connect crée de nouvelles rubriques internes à Kafka Connect (le nom de la rubrique commence par __amazon_msk_connect) à chaque déploiement de connecteurs. Les anciennes rubriques attachées à des connecteurs supprimés ne sont pas automatiquement supprimées car les rubriques internes, telles que offset.storage.topic, peuvent être réutilisées entre les connecteurs. Cependant, vous pouvez supprimer manuellement les rubriques internes non utilisées créées par MSK Connect. Les rubriques internes sont nommées selon le format __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id.

    L'expression régulière __amazon_msk_connect_<offsets|status|configs>_connector_name_connector_id peut être utilisée pour supprimer les rubriques internes. Vous ne devez pas supprimer une rubrique interne actuellement utilisée par un connecteur en cours d'exécution.

  • Utilisation du même nom pour les rubriques internes créées par MSK Connect  : si vous souhaitez réutiliser la rubrique de stockage des décalages pour utiliser les décalages d'un connecteur créé précédemment, vous devez donner au nouveau connecteur le même nom que l'ancien connecteur. La propriété offset.storage.topic peut être définie à l'aide de la configuration de worker pour attribuer le même nom à offset.storage.topic et réutilisée entre différents connecteurs. Cette configuration est décrite dans Gestion des décalages de connecteurs. MSK Connect n'autorise pas que les différents connecteurs partagent config.storage.topic etstatus.storage.topic. Ces rubriques sont créées chaque fois que vous créez un nouveau connecteur dans MSKC. Ils sont automatiquement nommés selon le format __amazon_msk_connect_<status|configs>_connector_name_connector_id et sont donc différents selon les connecteurs que vous créez.