Traitement des MSK messages Amazon avec Lambda - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Traitement des MSK messages Amazon avec Lambda

Note

Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.

Ajouter Amazon MSK en tant que source d'événements

Pour créer un mappage de sources d'événements, ajoutez Amazon MSK en tant que déclencheur de fonction Lambda à l'aide de la console Lambda, de an AWS SDKou du ().AWS Command Line InterfaceAWS CLI Notez que lorsque vous ajoutez Amazon MSK en tant que déclencheur, Lambda prend en charge les VPC paramètres du MSK cluster Amazon, et non ceux de la fonction Lambda. VPC

Cette section explique comment créer un mappage de source d'événement à l'aide de la console Lambda et de l' AWS CLI.

Prérequis

  • Un MSK cluster Amazon et un sujet Kafka. Pour plus d'informations, consultez Getting Started Using Amazon MSK dans le guide du développeur Amazon Managed Streaming for Apache Kafka.

  • Rôle d'exécution autorisé à accéder aux AWS ressources utilisées par votre MSK cluster.

Identifiant de groupe de consommateurs personnalisable

Lorsque vous configurez Kafka comme source d’événements, vous pouvez spécifier un identifiant de groupe de consommateurs. Cet identifiant de groupe de consommateurs est un identifiant existant pour le groupe de clients Kafka auquel vous souhaitez rattacher votre fonction Lambda. Vous pouvez utiliser cette fonction pour migrer facilement toutes les configurations de traitement d’enregistrements Kafka en cours depuis d’autres clients vers Lambda.

Si vous spécifiez un identifiant de groupe de consommateurs et qu’il existe d’autres sondeurs actifs au sein de ce groupe de consommateurs, Kafka distribue des messages à tous les consommateurs. En d’autres termes, Lambda ne reçoit pas l’intégralité du message relatif au sujet Kafka. Si vous souhaitez que Lambda gère tous les messages de la rubrique, désactivez tous les autres sondeurs de ce groupe de consommateurs.

De plus, si vous spécifiez un identifiant de groupe de consommateurs et que Kafka trouve un groupe de consommateurs existant valide avec le même identifiant, Lambda ignore le paramètre StartingPosition pour le mappage des sources d’événements. Lambda commence plutôt à traiter les enregistrements en fonction de la compensation engagée par le groupe de consommateurs. Si vous spécifiez un identifiant de groupe de consommateurs et que Kafka ne trouve aucun groupe de consommateurs existant, Lambda configure votre source d’événement avec le StartingPosition spécifié.

L’identifiant du groupe de consommateurs que vous spécifiez doit être unique parmi toutes vos sources d’événements Kafka. Après avoir créé un mappage des sources d’événements Kafka avec l’identifiant de groupe de consommateurs spécifié, vous ne pouvez plus mettre à jour cette valeur.

Ajouter un MSK déclencheur Amazon (console)

Suivez ces étapes pour ajouter votre MSK cluster Amazon et un sujet Kafka comme déclencheur pour votre fonction Lambda.

Pour ajouter un MSK déclencheur Amazon à votre fonction Lambda (console)
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom de votre fonction Lambda.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).

  4. Sous Trigger configuration (Configuration du déclencheur), procédez comme suit :

    1. Choisissez le type de déclencheur MSK.

    2. Pour MSKcluster, sélectionnez votre cluster.

    3. Pour Batch size (Taille de lot), entrez le nombre maximum d’enregistrements à recevoir dans un même lot.

    4. Pour Batch window, veuillez saisir l’intervalle de temps maximal en secondes nécessaire à Lambda pour collecter des enregistrements avant d’invoquer la fonction.

    5. Dans Topic name (Nom de la rubrique), saisissez le nom d’une rubrique Kafka.

    6. (Facultatif) Pour l’identifiant de groupe de consommateurs, entrez l’identifiant d’un groupe de consommateurs Kafka à rejoindre.

    7. (Facultatif) Pour Position de départ, choisissez Dernier pour commencer à lire le flux à partir du dernier enregistrement, Supprimer l’horizon pour commencer au premier enregistrement disponible ou À l’horodatage pour spécifier un horodatage à partir duquel commencer la lecture.

    8. (Facultatif) Pour Authentification, choisissez la clé secrète pour vous authentifier auprès des courtiers de votre MSK cluster.

    9. Pour créer le déclencheur dans un état désactivé pour le test (recommandé), désactivez Enable trigger (Activer le déclencheur). Ou, pour activer le déclencheur immédiatement, sélectionnezActiver un déclencheur.

  5. Pour créer le déclencheur, choisissez Add (Ajouter).

Ajouter un MSK déclencheur Amazon (AWS CLI)

Utilisez les exemples de AWS CLI commandes suivants pour créer et afficher un MSK déclencheur Amazon pour votre fonction Lambda.

Création d'un déclencheur à l'aide du AWS CLI

Exemple — Crée un mappage des sources d'événements pour le cluster qui utilise IAM l'authentification

L'exemple suivant utilise la create-event-source-mapping AWS CLI commande pour mapper une fonction Lambda nommée my-kafka-function à une rubrique Kafka nommée. AWSKafkaTopic La position de départ de la rubrique est définie sur LATEST. Lorsque le cluster utilise l'authentification IAM basée sur les rôles, vous n'avez pas besoin d'SourceAccessConfigurationobjet. Exemple :

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Exemple — Crée un mappage des sources d'événements pour le cluster qui utilise SCRAM l'authentificationSASL/

Si le cluster utilise l'SCRAMauthentificationSASL/, vous devez inclure un SourceAccessConfigurationobjet qui spécifie SASL_SCRAM_512_AUTH et un secret Secrets ManagerARN.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Exemple — Crée un mappage des sources d'événements pour le cluster qui utilise TLS l'authentification m

Si le cluster utilise l'TLSauthentification m, vous devez inclure un SourceAccessConfigurationobjet qui spécifie CLIENT_CERTIFICATE_TLS_AUTH et un secret Secrets ManagerARN.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

Pour plus d'informations, consultez la documentation CreateEventSourceMappingAPIde référence.

Affichage de l'état à l'aide du AWS CLI

L'exemple suivant utilise la get-event-source-mapping AWS CLI commande pour décrire l'état du mappage des sources d'événements que vous avez créé.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Paramètres MSK de configuration Amazon

Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMapping. UpdateEventSourceMappingAPI Toutefois, seuls certains paramètres s'appliquent à AmazonMSK.

Paramètre Obligatoire Par défaut Remarques

AmazonManagedKafkaEventSourceConfig

N

Contient le ConsumerGroupId champ, dont la valeur par défaut est unique.

Peut définir uniquement sur Create (Créer)

BatchSize

N

100

Maximum : 10 000.

Activé

N

Activées

none

EventSourceArn

Y

N/A

Peut définir uniquement sur Create (Créer)

FunctionName

Y

N/A

none

FilterCriteria

N

N/A

Contrôlez les événements que Lambda envoie à votre fonction

MaximumBatchingWindowInSeconds

N

500 ms

Comportement de traitement par lots

SourceAccessConfigurations

N

Pas d’informations d’identification

SASL/SCRAMou CLIENT _ _ CERTIFICATE TLS _ AUTH (mutuellesTLS) identifiants d'authentification pour votre source d'événement

StartingPosition

Y

N/A

AT_TIMESTAMP, TRIM _HORIZON, ou LATEST

Peut définir uniquement sur Create (Créer)

StartingPositionTimestamp

N

N/A

Obligatoire s' StartingPosition il est défini sur AT_ TIMESTAMP

Rubriques

Y

N/A

Nom de rubrique Kafka

Peut définir uniquement sur Create (Créer)

Création de mappages de sources d’événements entre comptes

Vous pouvez utiliser la connectivité VPC multi-privée pour connecter une fonction Lambda à un MSK cluster provisionné dans un autre. Compte AWS Utilisations de VPC connectivité multiple AWS PrivateLink, ce qui permet de maintenir tout le trafic sur le AWS réseau.

Note

Vous ne pouvez pas créer de mappages de sources d'événements entre comptes pour les clusters sans serveurMSK.

Pour créer un mappage des sources d'événements entre comptes, vous devez d'abord configurer la VPC multiconnectivité pour le MSK cluster. Lorsque vous créez le mappage des sources d'événements, utilisez la VPC connexion gérée ARN plutôt que le clusterARN, comme indiqué dans les exemples suivants. Le CreateEventSourceMappingfonctionnement varie également en fonction du type d'authentification utilisé par le MSK cluster.

Exemple — Créez un mappage des sources d'événements entre comptes pour le cluster qui utilise l'authentification IAM

Lorsque le cluster utilise l'authentification IAM basée sur les rôles, vous n'avez pas besoin d'SourceAccessConfigurationobjet. Exemple :

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Exemple — Crée un mappage des sources d'événements entre comptes pour le cluster qui utilise l'authentificationSASL/SCRAM

Si le cluster utilise l'SCRAMauthentificationSASL/, vous devez inclure un SourceAccessConfigurationobjet qui spécifie SASL_SCRAM_512_AUTH et un secret Secrets ManagerARN.

Il existe deux manières d'utiliser les secrets pour les mappages de sources d'MSKévénements Amazon entre comptes avec l'authentificationSASL/SCRAM:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Exemple — Crée un mappage des sources d'événements entre comptes pour le cluster qui utilise l'authentification m TLS

Si le cluster utilise l'TLSauthentification m, vous devez inclure un SourceAccessConfigurationobjet qui spécifie CLIENT_CERTIFICATE_TLS_AUTH et un secret Secrets ManagerARN. Le secret peut être stocké dans le compte du cluster ou dans le compte de la fonction Lambda.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Utilisation d'un MSK cluster Amazon comme source d'événements

Lorsque vous ajoutez votre MSK cluster Apache Kafka ou Amazon comme déclencheur pour votre fonction Lambda, le cluster est utilisé comme source d'événements.

Lambda lit les données d'événements des sujets Kafka que vous spécifiez Topics dans une CreateEventSourceMappingdemande, en fonction de StartingPosition ce que vous spécifiez. Lorsque le traitement a réussi, votre rubrique Kafka est validée dans votre cluster Kafka.

Si vous spécifiez StartingPosition comme LATEST, Lambda commence à lire à partir du dernier message dans chaque partition de la rubrique. Un certain temps pouvant s’écouler après la configuration du déclencheur avant que Lambda commence à lire les messages, Lambda ne lit aucun message produit pendant cette fenêtre de temps.

Lambda lit les messages de manière séquentielle pour chaque partition de rubrique Kafka. Une seule charge utile Lambda peut contenir des messages provenant de plusieurs partitions. Lorsque d'autres enregistrements sont disponibles, Lambda continue de traiter les enregistrements par lots, en fonction de la BatchSize valeur que vous spécifiez dans une CreateEventSourceMappingdemande, jusqu'à ce que votre fonction aborde le sujet.

Après avoir traité chaque lot, Lambda valide les décalages des messages dans celui-ci. Si votre fonction renvoie une erreur pour l'un des messages d'un lot, Lambda réessaie le lot de messages complet jusqu'à ce que le traitement réussisse ou que les messages expirent. Vous pouvez envoyer les enregistrements qui échouent à toutes les tentatives vers une destination en cas d'échec pour un traitement ultérieur.

Note

Alors que les fonctions Lambda ont généralement un délai d'expiration maximal de 15 minutes, les mappages de sources d'événements pour Amazon, Apache Kafka autogéré, MSK Amazon DocumentDB et Amazon MQ pour ActiveMQ et RabbitMQ ne prennent en charge que les fonctions dont le délai d'expiration maximal est de 14 minutes. Cette contrainte garantit que le mappage des sources d’événements peut gérer correctement les erreurs de fonction et effectuer de nouvelles tentatives.

Positions de départ des interrogations et des flux

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON ou AT_TIMESTAMP.

CloudWatch Métriques Amazon

Lambda émet la métrique OffsetLag pendant que votre fonction traite les registres. La valeur de cette métrique est la différence de décalage entre le dernier enregistrement inscrit dans la rubrique source de l’événement Kafka et le dernier enregistrement traité par le groupe de consommateurs de votre fonction. Vous pouvez utiliser OffsetLag pour estimer la latence entre le moment où un enregistrement est ajouté et celui où votre groupe de consommateurs le traite.

Une tendance à la hausse de OffsetLag peut indiquer des problèmes liés aux sondeurs dans le groupe de consommateurs de votre fonction. Pour de plus amples informations, veuillez consulter Afficher les métriques des fonctions Lambda.

Mise à l'échelle automatique de la source MSK d'événements Amazon

Lorsque vous créez initialement une source d'MSKévénements Amazon, Lambda alloue un consommateur pour traiter toutes les partitions du sujet Kafka. Chaque consommateur dispose de plusieurs processeurs exécutés en parallèle pour gérer des charges de travail accrues. Lambda augmente ou diminue automatiquement le nombre de consommateurs, en fonction de la charge de travail. Pour préserver l’ordre des messages dans chaque partition, le nombre maximum de consommateurs est de un par partition dans la rubrique.

Toutes les minutes, Lambda évalue le décalage de consommateurs de toutes les partitions dans la rubrique. Si le décalage est trop élevé, la partition reçoit des messages plus rapidement que Lambda ne peut les traiter. Si nécessaire, Lambda ajoute ou supprime des consommateurs dans la rubrique. Le processus de mise à l’échelle consistant à ajouter ou à supprimer des consommateurs a lieu dans les trois minutes suivant l’évaluation.

Si votre fonction Lambda cible est limitée, Lambda réduit le nombre de consommateurs. Cette action réduit la charge de travail de la fonction en diminuant le nombre de messages que les consommateurs peuvent échanger avec la fonction.

Pour contrôler le débit de votre rubrique Kafka, consultez la Métrique du décalage de délai Lambda émet pendant que votre fonction traite les enregistrements.

Pour vérifier le nombre d’invocations de fonctions qui se produisent en parallèle, vous pouvez également surveiller les métriques de simultanéité pour votre fonction.