Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Traitement des MSK messages Amazon avec Lambda
Note
Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.
Rubriques
- Ajouter Amazon MSK en tant que source d'événements
- Paramètres MSK de configuration Amazon
- Création de mappages de sources d’événements entre comptes
- Utilisation d'un MSK cluster Amazon comme source d'événements
- Positions de départ des interrogations et des flux
- CloudWatch Métriques Amazon
- Mise à l'échelle automatique de la source MSK d'événements Amazon
Ajouter Amazon MSK en tant que source d'événements
Pour créer un mappage de sources d'événements, ajoutez Amazon MSK en tant que déclencheur de fonction Lambda à l'aide de la console Lambda, de an AWS SDK
Cette section explique comment créer un mappage de source d'événement à l'aide de la console Lambda et de l' AWS CLI.
Prérequis
-
Un MSK cluster Amazon et un sujet Kafka. Pour plus d'informations, consultez Getting Started Using Amazon MSK dans le guide du développeur Amazon Managed Streaming for Apache Kafka.
-
Rôle d'exécution autorisé à accéder aux AWS ressources utilisées par votre MSK cluster.
Identifiant de groupe de consommateurs personnalisable
Lorsque vous configurez Kafka comme source d’événements, vous pouvez spécifier un identifiant de groupe de consommateurs. Cet identifiant de groupe de consommateurs est un identifiant existant pour le groupe de clients Kafka auquel vous souhaitez rattacher votre fonction Lambda. Vous pouvez utiliser cette fonction pour migrer facilement toutes les configurations de traitement d’enregistrements Kafka en cours depuis d’autres clients vers Lambda.
Si vous spécifiez un identifiant de groupe de consommateurs et qu’il existe d’autres sondeurs actifs au sein de ce groupe de consommateurs, Kafka distribue des messages à tous les consommateurs. En d’autres termes, Lambda ne reçoit pas l’intégralité du message relatif au sujet Kafka. Si vous souhaitez que Lambda gère tous les messages de la rubrique, désactivez tous les autres sondeurs de ce groupe de consommateurs.
De plus, si vous spécifiez un identifiant de groupe de consommateurs et que Kafka trouve un groupe de consommateurs existant valide avec le même identifiant, Lambda ignore le paramètre StartingPosition
pour le mappage des sources d’événements. Lambda commence plutôt à traiter les enregistrements en fonction de la compensation engagée par le groupe de consommateurs. Si vous spécifiez un identifiant de groupe de consommateurs et que Kafka ne trouve aucun groupe de consommateurs existant, Lambda configure votre source d’événement avec le StartingPosition
spécifié.
L’identifiant du groupe de consommateurs que vous spécifiez doit être unique parmi toutes vos sources d’événements Kafka. Après avoir créé un mappage des sources d’événements Kafka avec l’identifiant de groupe de consommateurs spécifié, vous ne pouvez plus mettre à jour cette valeur.
Ajouter un MSK déclencheur Amazon (console)
Suivez ces étapes pour ajouter votre MSK cluster Amazon et un sujet Kafka comme déclencheur pour votre fonction Lambda.
Pour ajouter un MSK déclencheur Amazon à votre fonction Lambda (console)
-
Ouvrez la page Functions (Fonctions)
de la console Lambda. -
Choisissez le nom de votre fonction Lambda.
-
Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).
-
Sous Trigger configuration (Configuration du déclencheur), procédez comme suit :
-
Choisissez le type de déclencheur MSK.
-
Pour MSKcluster, sélectionnez votre cluster.
-
Pour Batch size (Taille de lot), entrez le nombre maximum d’enregistrements à recevoir dans un même lot.
-
Pour Batch window, veuillez saisir l’intervalle de temps maximal en secondes nécessaire à Lambda pour collecter des enregistrements avant d’invoquer la fonction.
-
Dans Topic name (Nom de la rubrique), saisissez le nom d’une rubrique Kafka.
-
(Facultatif) Pour l’identifiant de groupe de consommateurs, entrez l’identifiant d’un groupe de consommateurs Kafka à rejoindre.
-
(Facultatif) Pour Position de départ, choisissez Dernier pour commencer à lire le flux à partir du dernier enregistrement, Supprimer l’horizon pour commencer au premier enregistrement disponible ou À l’horodatage pour spécifier un horodatage à partir duquel commencer la lecture.
-
(Facultatif) Pour Authentification, choisissez la clé secrète pour vous authentifier auprès des courtiers de votre MSK cluster.
-
Pour créer le déclencheur dans un état désactivé pour le test (recommandé), désactivez Enable trigger (Activer le déclencheur). Ou, pour activer le déclencheur immédiatement, sélectionnezActiver un déclencheur.
-
-
Pour créer le déclencheur, choisissez Add (Ajouter).
Ajouter un MSK déclencheur Amazon (AWS CLI)
Utilisez les exemples de AWS CLI commandes suivants pour créer et afficher un MSK déclencheur Amazon pour votre fonction Lambda.
Création d'un déclencheur à l'aide du AWS CLI
Exemple — Crée un mappage des sources d'événements pour le cluster qui utilise IAM l'authentification
L'exemple suivant utilise la create-event-source-mappingmy-kafka-function
à une rubrique Kafka nommée. AWSKafkaTopic
La position de départ de la rubrique est définie sur LATEST
. Lorsque le cluster utilise l'authentification IAM basée sur les rôles, vous n'avez pas besoin d'SourceAccessConfigurationobjet. Exemple :
aws lambda create-event-source-mapping \ --event-source-arn
arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2
\ --topicsAWSKafkaTopic
\ --starting-positionLATEST
\ --function-namemy-kafka-function
Exemple — Crée un mappage des sources d'événements pour le cluster qui utilise SCRAM l'authentificationSASL/
Si le cluster utilise l'SCRAMauthentificationSASL/, vous devez inclure un SourceAccessConfigurationobjet qui spécifie SASL_SCRAM_512_AUTH
et un secret Secrets ManagerARN.
aws lambda create-event-source-mapping \ --event-source-arn
arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2
\ --topicsAWSKafkaTopic
\ --starting-positionLATEST
\ --function-namemy-kafka-function
--source-access-configurations'[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Exemple — Crée un mappage des sources d'événements pour le cluster qui utilise TLS l'authentification m
Si le cluster utilise l'TLSauthentification m, vous devez inclure un SourceAccessConfigurationobjet qui spécifie CLIENT_CERTIFICATE_TLS_AUTH
et un secret Secrets ManagerARN.
aws lambda create-event-source-mapping \ --event-source-arn
arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2
\ --topicsAWSKafkaTopic
\ --starting-positionLATEST
\ --function-namemy-kafka-function
--source-access-configurations'[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Pour plus d'informations, consultez la documentation CreateEventSourceMappingAPIde référence.
Affichage de l'état à l'aide du AWS CLI
L'exemple suivant utilise la get-event-source-mapping
aws lambda get-event-source-mapping \ --uuid
6d9bce8e-836b-442c-8070-74e77903c815
Paramètres MSK de configuration Amazon
Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMapping. UpdateEventSourceMappingAPI Toutefois, seuls certains paramètres s'appliquent à AmazonMSK.
Paramètre | Obligatoire | Par défaut | Remarques |
---|---|---|---|
AmazonManagedKafkaEventSourceConfig |
N |
Contient le ConsumerGroupId champ, dont la valeur par défaut est unique. |
Peut définir uniquement sur Create (Créer) |
BatchSize |
N |
100 |
Maximum : 10 000. |
Activé |
N |
Activées |
none |
EventSourceArn |
Y |
N/A |
Peut définir uniquement sur Create (Créer) |
FunctionName |
Y |
N/A |
none |
FilterCriteria |
N |
N/A |
|
MaximumBatchingWindowInSeconds |
N |
500 ms |
|
SourceAccessConfigurations |
N |
Pas d’informations d’identification |
SASL/SCRAMou CLIENT _ _ CERTIFICATE TLS _ AUTH (mutuellesTLS) identifiants d'authentification pour votre source d'événement |
StartingPosition |
Y |
N/A |
AT_TIMESTAMP, TRIM _HORIZON, ou LATEST Peut définir uniquement sur Create (Créer) |
StartingPositionTimestamp |
N |
N/A |
Obligatoire s' StartingPosition il est défini sur AT_ TIMESTAMP |
Rubriques |
Y |
N/A |
Nom de rubrique Kafka Peut définir uniquement sur Create (Créer) |
Création de mappages de sources d’événements entre comptes
Vous pouvez utiliser la connectivité VPC multi-privée pour connecter une fonction Lambda à un MSK cluster provisionné dans un autre. Compte AWS Utilisations de VPC connectivité multiple AWS PrivateLink, ce qui permet de maintenir tout le trafic sur le AWS réseau.
Note
Vous ne pouvez pas créer de mappages de sources d'événements entre comptes pour les clusters sans serveurMSK.
Pour créer un mappage des sources d'événements entre comptes, vous devez d'abord configurer la VPC multiconnectivité pour le MSK cluster. Lorsque vous créez le mappage des sources d'événements, utilisez la VPC connexion gérée ARN plutôt que le clusterARN, comme indiqué dans les exemples suivants. Le CreateEventSourceMappingfonctionnement varie également en fonction du type d'authentification utilisé par le MSK cluster.
Exemple — Créez un mappage des sources d'événements entre comptes pour le cluster qui utilise l'authentification IAM
Lorsque le cluster utilise l'authentification IAM basée sur les rôles, vous n'avez pas besoin d'SourceAccessConfigurationobjet. Exemple :
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:
us-east-1:111122223333
:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7
\ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Exemple — Crée un mappage des sources d'événements entre comptes pour le cluster qui utilise l'authentificationSASL/SCRAM
Si le cluster utilise l'SCRAMauthentificationSASL/, vous devez inclure un SourceAccessConfigurationobjet qui spécifie SASL_SCRAM_512_AUTH
et un secret Secrets ManagerARN.
Il existe deux manières d'utiliser les secrets pour les mappages de sources d'MSKévénements Amazon entre comptes avec l'authentificationSASL/SCRAM:
-
Créez un secret dans le compte de fonction Lambda et synchronisez-le avec le secret du cluster. Créez une rotation pour synchroniser les deux secrets. Cette option vous permet de contrôler le secret depuis le compte de fonction.
-
Utilisez le secret associé au MSK cluster. Ce secret doit autoriser l’accès intercompte au compte de la fonction Lambda. Pour plus d'informations, consultez la section Autorisations relatives aux AWS Secrets Manager secrets pour les utilisateurs d'un autre compte.
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:
us-east-1:111122223333
:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7
\ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations'[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Exemple — Crée un mappage des sources d'événements entre comptes pour le cluster qui utilise l'authentification m TLS
Si le cluster utilise l'TLSauthentification m, vous devez inclure un SourceAccessConfigurationobjet qui spécifie CLIENT_CERTIFICATE_TLS_AUTH
et un secret Secrets ManagerARN. Le secret peut être stocké dans le compte du cluster ou dans le compte de la fonction Lambda.
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:
us-east-1:111122223333
:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7
\ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations'[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Utilisation d'un MSK cluster Amazon comme source d'événements
Lorsque vous ajoutez votre MSK cluster Apache Kafka ou Amazon comme déclencheur pour votre fonction Lambda, le cluster est utilisé comme source d'événements.
Lambda lit les données d'événements des sujets Kafka que vous spécifiez Topics
dans une CreateEventSourceMappingdemande, en fonction de StartingPosition
ce que vous spécifiez. Lorsque le traitement a réussi, votre rubrique Kafka est validée dans votre cluster Kafka.
Si vous spécifiez StartingPosition
comme LATEST
, Lambda commence à lire à partir du dernier message dans chaque partition de la rubrique. Un certain temps pouvant s’écouler après la configuration du déclencheur avant que Lambda commence à lire les messages, Lambda ne lit aucun message produit pendant cette fenêtre de temps.
Lambda lit les messages de manière séquentielle pour chaque partition de rubrique Kafka. Une seule charge utile Lambda peut contenir des messages provenant de plusieurs partitions. Lorsque d'autres enregistrements sont disponibles, Lambda continue de traiter les enregistrements par lots, en fonction de la BatchSize
valeur que vous spécifiez dans une CreateEventSourceMappingdemande, jusqu'à ce que votre fonction aborde le sujet.
Après avoir traité chaque lot, Lambda valide les décalages des messages dans celui-ci. Si votre fonction renvoie une erreur pour l'un des messages d'un lot, Lambda réessaie le lot de messages complet jusqu'à ce que le traitement réussisse ou que les messages expirent. Vous pouvez envoyer les enregistrements qui échouent à toutes les tentatives vers une destination en cas d'échec pour un traitement ultérieur.
Note
Alors que les fonctions Lambda ont généralement un délai d'expiration maximal de 15 minutes, les mappages de sources d'événements pour Amazon, Apache Kafka autogéré, MSK Amazon DocumentDB et Amazon MQ pour ActiveMQ et RabbitMQ ne prennent en charge que les fonctions dont le délai d'expiration maximal est de 14 minutes. Cette contrainte garantit que le mappage des sources d’événements peut gérer correctement les erreurs de fonction et effectuer de nouvelles tentatives.
Positions de départ des interrogations et des flux
Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.
-
Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.
-
Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.
Ce comportement signifie que si vous spécifiez LATEST
comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON
ou AT_TIMESTAMP
.
CloudWatch Métriques Amazon
Lambda émet la métrique OffsetLag
pendant que votre fonction traite les registres. La valeur de cette métrique est la différence de décalage entre le dernier enregistrement inscrit dans la rubrique source de l’événement Kafka et le dernier enregistrement traité par le groupe de consommateurs de votre fonction. Vous pouvez utiliser OffsetLag
pour estimer la latence entre le moment où un enregistrement est ajouté et celui où votre groupe de consommateurs le traite.
Une tendance à la hausse de OffsetLag
peut indiquer des problèmes liés aux sondeurs dans le groupe de consommateurs de votre fonction. Pour de plus amples informations, veuillez consulter Afficher les métriques des fonctions Lambda.
Mise à l'échelle automatique de la source MSK d'événements Amazon
Lorsque vous créez initialement une source d'MSKévénements Amazon, Lambda alloue un consommateur pour traiter toutes les partitions du sujet Kafka. Chaque consommateur dispose de plusieurs processeurs exécutés en parallèle pour gérer des charges de travail accrues. Lambda augmente ou diminue automatiquement le nombre de consommateurs, en fonction de la charge de travail. Pour préserver l’ordre des messages dans chaque partition, le nombre maximum de consommateurs est de un par partition dans la rubrique.
Toutes les minutes, Lambda évalue le décalage de consommateurs de toutes les partitions dans la rubrique. Si le décalage est trop élevé, la partition reçoit des messages plus rapidement que Lambda ne peut les traiter. Si nécessaire, Lambda ajoute ou supprime des consommateurs dans la rubrique. Le processus de mise à l’échelle consistant à ajouter ou à supprimer des consommateurs a lieu dans les trois minutes suivant l’évaluation.
Si votre fonction Lambda cible est limitée, Lambda réduit le nombre de consommateurs. Cette action réduit la charge de travail de la fonction en diminuant le nombre de messages que les consommateurs peuvent échanger avec la fonction.
Pour contrôler le débit de votre rubrique Kafka, consultez la Métrique du décalage de délai Lambda émet pendant que votre fonction traite les enregistrements.
Pour vérifier le nombre d’invocations de fonctions qui se produisent en parallèle, vous pouvez également surveiller les métriques de simultanéité pour votre fonction.