Utilisation de Lambda avec Apache Kafka autogéré
Note
Si vous voulez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.
Lambda prend en charge Apache Kafka
Vous pouvez utiliser le service Kafka géré par AWS, Amazon Managed Streaming for Apache Kafka (Amazon MSK) ou un cluster Kafka autogéré. Pour plus de détails sur l'utilisation de Lambda avec Amazon MSK, consultez Utilisation de Lambda avec Amazon MSK.
Cette rubrique décrit comment utiliser Lambda avec un cluster Kafka autogéré. Dans la terminologie AWS, un cluster autogéré inclut des clusters Kafka hébergés non-AWS. Par exemple, vous pouvez héberger votre cluster Kafka avec un fournisseur de cloud tel que CloudKarafka
Apache Kafka en tant que source d'événement fonctionne de la même manière qu'Amazon Simple Queue Service (Amazon SQS) ou Amazon Kinesis. Lambda interroge en interne les nouveaux messages de la source d'événement, puis appelle de manière synchrone la fonction Lambda cible. Lambda lit les messages par lot et les fournit à votre fonction en tant que charge utile d'événement. La taille de lot maximale est configurable (par défaut, 100 messages).
Pour les sources d'événements basées sur Kafka, Lambda prend en charge les paramètres de contrôle du traitement par lots, tels que les fenêtres de traitement par lots et la taille des lots. Pour de plus amples informations, veuillez consulter Comportement de traitement par lots.
Pour un exemple d'utilisation de Kafka autogéré en tant que source d'événement, consultez Utilisation d'Apache Kafka autohébergé en tant que source d'événement pour AWS Lambda
Rubriques
- Exemple d'évènement
- Authentification de cluster Kafka
- Gestion des accès et des autorisations d'API
- Authentification et erreurs d’autorisation
- Configuration réseau
- Ajout d'un cluster Kafka en tant que source d'événement
- Utilisation d'un cluster Kafka en tant que source d'événement
- Mise à l'échelle automatique de la source d'événement Kafka
- Opérations d'API de source d'événement
- API de mappage de la source d'événement
- Métriques Amazon CloudWatch
- Paramètres de configuration Apache Kafka autogérés
Exemple d'évènement
Lambda envoie le lot de messages dans le paramètre d'événement quand il appelle votre fonction Lambda. La charge utile d'un événement contient un tableau de messages. Chaque élément de tableau contient des détails de la rubrique Kafka et l'identifiant de partition Kafka, ainsi qu'un horodatage et un message codé en base 64.
{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }
Authentification de cluster Kafka
Lambda prend en charge plusieurs méthodes d'authentification auprès de votre cluster Apache Kafka autogéré. Veillez à configurer le cluster Kafka pour utiliser l'une des méthodes d'authentification prises en charge suivantes : Pour de plus amples informations sur la sécurité, veuillez consulter la section Sécurité
Accès VPC
Si seuls des utilisateurs de Kafka au sein de votre VPC accèdent à vos courtiers Kafka, vous devez configurer la source d'événement avec un accès Amazon Virtual Private Cloud (Amazon VPC).
Authentification SASL/SCRAM
Lambda prend en charge l'authentification SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism) avec chiffrement du protocole TLS (Transport Layer Security) (SASL_SSL
). Lambda envoie les informations d'identification chiffrées pour s'authentifier auprès du cluster. Lambda ne prend pas en charge SASL/SCRAM avec du texte en clair (SASL_PLAINTEXT
). Pour plus d'informations sur l'authentification SASL/SCRAM, consultez RFC 5802
Lambda prend également en charge l'authentification SASL/PLAIN. Comme ce mécanisme utilise des informations d'identification en texte clair, la connexion au serveur doit utiliser le chiffrement TLS pour garantir la protection des informations d'identification.
Pour l'authentification SASL, vous devez stocker les informations d'identification en tant que secret dans AWS Secrets Manager. Pour plus d'informations sur l'utilisation de Secrets Manager, consultez Didacticiel : création et récupération d'un secret dans le Guide de l'utilisateur AWS Secrets Manager.
Important
Pour utiliser Secrets Manager pour l'authentification, les secrets doivent être stockés dans la même région AWS que votre fonction Lambda.
Authentification TLS mutuelle
Mutual TLS (mTLS) fournit une authentification bidirectionnelle entre le client et le serveur. Le client envoie un certificat au serveur pour que le serveur vérifie le client, et le serveur envoie un certificat au client pour que le client vérifie le serveur.
Dans Apache Kafka autogéré, Lambda agit en tant que client. Vous configurez un certificat client (en tant que secret dans Secrets Manager) pour authentifier Lambda auprès des courtiers Kafka. Le certificat client doit être signé par une autorité de certification dans le magasin d'approbations du serveur.
Le cluster Kafka envoie un certificat de serveur à Lambda pour authentifier les courtiers auprès de Lambda. Le certificat de serveur peut être un certificat d'autorité de certification public ou un certificat CA/auto-signé privé. Le certificat d'autorité de certification publique doit être signé par une autorité de certification située du magasin d'approbations Lambda. Pour un certificat CA/auto-signé privé, vous configurez le certificat d'autorité de certification racine du serveur (en tant que secret dans Secrets Manager). Lambda utilise le certificat racine pour vérifier les courtiers Kafka.
Pour de plus amples informations sur mTLS, veuillez consulter Introduction d'une authentification TLS mutuelle pour Amazon MSK en tant que source d'événement
Configuration du secret du certificat client
Le secret CLIENT_CERTIFICATE_TLS_AUTH nécessite un champ de certificat et un champ de clé privée. Pour une clé privée chiffrée, le secret nécessite un mot de passe de clé privée. Le certificat et la clé privée doivent être au format PEM.
Note
Lambda prend en charge les algorithmes de chiffrement de clés privées PBES1
Le champ de certificat doit contenir une liste de certificats, commençant par le certificat client, suivi de tous les certificats intermédiaires et se terminant par le certificat racine. Chaque certificat doit commencer sur une nouvelle ligne avec la structure suivante :
-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----
Secrets Manager prend en charge les secrets jusqu'à 65 536 octets, ce qui offre suffisamment d'espace pour de longues chaînes de certificats.
La clé privée doit être au format PKCS #8
-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----
Pour une clé privée chiffrée, utilisez la structure suivante :
-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----
L'exemple suivant affiche le contenu d'un secret pour l'authentification mTLS à l'aide d'une clé privée chiffrée. Pour une clé privée chiffrée, incluez le mot de passe de clé privée dans le secret.
{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }
Configuration du secret du certificat d'autorité de certification racine du serveur
Vous créez ce secret si vos courtiers Kafka utilisent le chiffrement TLS avec des certificats signés par une autorité de certification privée. Vous pouvez utiliser le chiffrement TLS pour l'authentification VPC, SASL/SCRAM, SASL/PLAIN ou mTLS.
Le secret du certificat d'autorité de certification racine du serveur nécessite un champ contenant le certificat d'autorité de certification racine du courtier Kafka au format PEM. La structure du secret est présentée dans l'exemple suivant.
{ "certificate": "-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----"
Gestion des accès et des autorisations d'API
En plus d'accéder au cluster Amazon Kafka autogéré, votre fonction Lambda a besoin d'autorisations pour effectuer diverses actions d'API. Ajoutez ces autorisations au rôle d'exécution de votre fonction. Si vos utilisateurs ont besoin d'accéder à l'une des actions de l'API, ajoutez les autorisations requises à la stratégie d'identité de l'utilisateur ou du rôle AWS Identity and Access Management (IAM) correspondant.
Autorisations de fonction Lambda requises
Pour pouvoir créer et stocker des journaux dans un groupe de journaux dans Amazon CloudWatch Logs, votre fonction Lambda doit disposer des autorisations suivantes dans son rôle d'exécution :
Autorisations de fonction Lambda facultatives
Votre fonction Lambda peut également nécessiter ces autorisations pour :
-
Décrivez votre secret Secrets Manager.
-
Accéder à votre clé gérée par le client AWS Key Management Service (AWS KMS)
-
Accéder à votre Amazon VPC
Secrets Manager et autorisations AWS KMS
Selon le type de contrôle d'accès que vous configurez pour vos courtiers Kafka, votre fonction Lambda peut avoir besoin d'une autorisation pour accéder à votre secret Secrets Manager ou pour déchiffrer votre clé gérée par le client AWS KMS. Afin d'accéder à ces ressources, le rôle d'exécution de votre fonction doit disposer des autorisations suivantes :
Autorisations VPC
Si seuls des utilisateurs au sein d'un VPC peuvent accéder à votre cluster Apache Kafka autogéré, votre fonction Lambda doit être autorisée à accéder à vos ressources Amazon VPC. Ces ressources incluent les sous-réseaux, groupes de sécurité et interfaces réseau de votre VPC. Afin d'accéder à ces ressources, le rôle d'exécution de votre fonction doit disposer des autorisations suivantes :
Ajout d'autorisations à votre rôle d'exécution
Afin d'accéder à d'autres services AWS que votre cluster Apache Kafka autogéré utilise, Lambda utilise les stratégies d'autorisation que vous définissez dans le rôle d'exécution de votre fonction Lambda.
Par défaut, Lambda n'est pas autorisé à exécuter les actions obligatoires ou facultatives pour un cluster Apache Kafka autogéré. Vous devez créer et définir ces actions dans une stratégie d'approbation IAM, puis attacher celle-ci à votre rôle d'exécution. Cet exemple montre comment créer une stratégie autorisant Lambda à accéder à vos ressources Amazon VPC.
{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }
Pour plus d'informations sur la création d'un document de stratégie JSON dans la console IAM, consultez Création de stratégies sous l'onglet JSON dans le Guide de l'utilisateur IAM.
Octroi d'accès à des utilisateurs avec une politique IAM
Par défaut, les utilisateurs et les rôles n'ont pas l'autorisation d'effectuer des opérations d'API de source d'événement. Pour accorder l'accès aux utilisateurs de votre organisation ou de votre compte, vous pouvez ajouter ou mettre à jour une stratégie basée sur l'identité. Pour plus d'informations, consultez Contrôle de l'accès aux ressources AWS à l'aide de stratégies dans le Guide de l'utilisateur IAM.
Authentification et erreurs d’autorisation
Si l'une des autorisations requises pour consommer des données du cluster Kafka est manquante, Lambda affiche l'un des messages d'erreur suivants dans le mappage des sources d'événements sous LastProcessingResult.
Messages d'erreur
Le cluster n'a pas pu autoriser Lambda
Pour SASL/SCRAM ou mTLS, cette erreur indique que l'utilisateur fourni ne dispose pas de toutes les autorisations de liste de contrôle d'accès (ACL) Kafka requises suivantes :
-
DescripbeConfigs Cluster
-
Décrire un groupe
-
Lire le groupe
-
Décrire la rubrique
-
Lire la rubrique
Lorsque vous créez des listes ACL Kafka avec les autorisations kafka-cluster
requises, vous devez spécifier la rubrique et le groupe en tant que ressources. Le nom de la rubrique doit correspondre à la rubrique dans le mappage des sources d'événements. Le nom du groupe doit correspondre à l'UUID du mappage de la source d'événements.
Une fois que vous avez ajouté les autorisations requises au rôle d'exécution, il peut y avoir un délai de plusieurs minutes avant que les modifications ne prennent effet.
Échec de l'authentification SASL
Pour SASL/SCRAM ou SASL/PLAIN, cette erreur indique que les informations d'identification fournies ne sont pas valides.
Le serveur n'a pas réussi à authentifier Lambda
Cette erreur indique que l'agent Kafka n'a pas réussi à authentifier Lambda. Cette erreur se produit dans les conditions suivantes :
Vous n'avez pas fourni de certificat client pour l'authentification mTLS.
Vous avez fourni un certificat client, mais les courtiers ne sont pas configurés pour utiliser l'authentification mTLS.
Les courtiers Kafka ne font pas confiance à un certificat client.
Lambda n'a pas réussi à authentifier le serveur
Cette erreur indique que Lambda n'a pas réussi à authentifier l'agent Kafka. Cette erreur se produit dans les conditions suivantes :
Les courtiers Kafka utilisent des certificats auto-signés ou une autorité de certification privée, mais n'ont pas fourni le certificat d'autorité de certification racine du serveur.
Le certificat d'autorité de certification racine du serveur ne correspond pas à l'autorité de certification racine qui a signé le certificat du courtier.
La validation du nom d'hôte a échoué, car le certificat du courtier ne contient pas le nom DNS ou l'adresse IP du courtier comme autre nom de sujet.
Le certificat ou la clé privée fournis n'est pas valide
Cette erreur indique que le consommateur Kafka n'a pas pu utiliser le certificat ou la clé privée fournis. Assurez-vous que le certificat et la clé utilisent le format PEM et que le chiffrement de la clé privée utilise un algorithme PBES1.
Configuration réseau
Si vous configurez l'accès d'Amazon VPC à vos courtiers Kafka, Lambda doit avoir accès aux ressources Amazon VPC associées à votre cluster Kafka. Nous vous recommandons de déployer des AWS PrivateLink points de terminaison de VPC pour Lambda et AWS Security Token Service (AWS STS). Si l'agent utilise l'authentification, déployez également un point de terminaison de VPC pour Secrets Manager.
Vous pouvez aussi vous assurer que le VPC associé à votre cluster Kafka inclut une passerelle NAT par sous-réseau public. Pour de plus amples informations, veuillez consulter Accès à Internet et aux services pour des fonctions connectées à un VPC.
Vous devez configurer vos groupes de sécurité Amazon VPC avec les règles suivantes (au minimum) :
-
Règles entrantes – Autorisent tout le trafic sur le port de l'agent Kafka pour le groupe de sécurité spécifié en tant que source d'évènement. Kafka utilise le port 9092 par défaut.
-
Règles sortantes : autorisent tout le trafic sur le port 443 pour toutes les destinations. Autorisent tout le trafic sur le port de l'agent Kafka pour le groupe de sécurité spécifié en tant que source d'évènement. Kafka utilise le port 9092 par défaut.
-
Si vous utilisez des points de terminaison d'un VPC au lieu de la passerelle NAT, les groupes de sécurité associés aux points de terminaison du VPC doivent autoriser tout le trafic entrant sur le port 443 à partir des groupes de sécurité des sources d'événement.
Pour plus d'informations sur la configuration du réseau, consultez Configuration d'AWS Lambda avec un cluster Apache Kafka au sein d'un VPC
Ajout d'un cluster Kafka en tant que source d'événement
Pour créer un mappage de source d'événement, ajoutez votre cluster Kafka en tant que déclencheur de fonction Lambda à l'aide de la console Lambda, d'un kit SDK AWS
Cette section explique comment créer un mappage de source d'événement à l'aide de la console Lambda et de l'AWS CLI.
Note
Lorsque vous mettez à jour, désactivez ou supprimez un mappage des sources d'événements pour Apache Kafka autogéré, la prise en compte de vos modifications peut prendre jusqu'à 15 minutes. Avant la fin de cette période, votre mappage des sources d'événements peut continuer à traiter les événements et à invoquer votre fonction à l'aide de vos paramètres précédents. Cela est vrai même lorsque l'état du mappage des sources d'événements affiché dans la console indique que vos modifications ont été appliquées.
Prérequis
-
Cluster Apache Kafka autogéré. Lambda prend en charge Apache Kafka versions 0.10.0.0 et ultérieures.
-
Un rôle d'exécution avec l'autorisation d'accéder aux ressources AWS utilisées par votre cluster Kafka autogéré.
Identifiant de groupe de consommateurs personnalisable
Lorsque vous configurez Kafka comme source d'événements, vous pouvez spécifier un identifiant de groupe de consommateurs. Cet identifiant de groupe de consommateurs est un identifiant existant pour le groupe de clients Kafka auquel vous souhaitez rattacher votre fonction Lambda. Vous pouvez utiliser cette fonction pour migrer facilement toutes les configurations de traitement d'enregistrements Kafka en cours depuis d'autres clients vers Lambda.
Si vous spécifiez un identifiant de groupe de consommateurs et qu'il existe d'autres sondeurs actifs au sein de ce groupe de consommateurs, Kafka distribue des messages à tous les consommateurs. En d'autres termes, Lambda ne reçoit pas l'intégralité du message relatif au sujet Kafka. Si vous souhaitez que Lambda gère tous les messages de la rubrique, désactivez tous les autres sondeurs de ce groupe de consommateurs.
De plus, si vous spécifiez un identifiant de groupe de consommateurs et que Kafka trouve un groupe de consommateurs existant valide avec le même identifiant, Lambda ignore le paramètre StartingPosition
pour le mappage de la source d'événements. Lambda commence plutôt à traiter les enregistrements en fonction de la compensation engagée par le groupe de consommateurs. Si vous spécifiez un identifiant de groupe de consommateurs et que Kafka ne trouve aucun groupe de consommateurs existant, Lambda configure votre source d'événement avec le StartingPosition
spécifié.
L'identifiant du groupe de consommateurs que vous spécifiez doit être unique parmi toutes vos sources d'événements Kafka. Après avoir créé un mappage de sources d'événements Kafka avec l'identifiant de groupe de consommateurs spécifié, vous ne pouvez plus mettre à jour cette valeur.
Ajout d'un cluster Kafka autogéré (console)
Pour ajouter votre cluster Apache Kafka autogéré et une rubrique Kafka en tant que déclencheur pour votre fonction Lambda, procédez comme suit.
Pour ajouter un déclencheur Apache Kafka à votre fonction Lambda (console)
-
Ouvrez la page Functions (Fonctions)
de la console Lambda. -
Choisissez le nom de votre fonction Lambda.
-
Sous Function overview (Vue d'ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).
-
Sous Trigger configuration (Configuration du déclencheur), procédez comme suit :
-
Choisissez le type de déclencheur Apache Kafka.
-
Pour Bootstrap servers (Serveurs d'amorçage), entrez l'adresse de paire hôte et port d'un agent Kafka dans votre cluster, puis choisissez Add (Ajouter). Répétez l'opération pour chaque agent Kafka dans le cluster.
-
Pour Topic name (Nom de rubrique), entrez le nom de la rubrique Kafka utilisée pour stocker les registres dans le cluster.
-
(Facultatif) Pour Batch size (Taille de lot), entrez le nombre maximal de registres à recevoir dans un même lot.
-
Pour Batch window, veuillez saisir l'intervalle de temps maximal en secondes nécessaire à Lambda pour collecter des enregistrements avant d'appeler la fonction.
-
(Facultatif) Pour l'identifiant de groupe de consommateurs, entrez l'identifiant d'un groupe de consommateurs Kafka à rejoindre.
-
(Facultatif) Pour Starting position (Position de départ), choisissez Latest (Dernière) pour commencer à lire le flux à partir du dernier registre. Ou bien choisissez Horizon trim (Supprimer l'horizon) pour commencer à lire le flux à partir du premier registre disponible.
-
(Facultatif) Pour VPC, choisissez l'Amazon VPC pour votre cluster Kafka. Ensuite, choisissez le VPC subnets (Sous-réseaux VPC) et les VPC security groups (Groupes de sécurité VPC).
Ce paramètre est requis si seuls des utilisateurs au sein de votre VPC accèdent à vos courtiers.
-
(Facultatif) Pour Authentication (Authentification), choisissez Add (Ajouter), puis procédez comme suit :
-
Choisissez le protocole d'accès ou d'authentification des courtiers Kafka dans votre cluster.
-
Si votre agent Kafka utilise l'authentification SASL/PLAIN, choisissez BASIC_AUTH.
-
Si votre agent utilise une authentification SASL/SCRAM, choisissez l'un des protocoles SASL_SCRAM.
-
Si vous configurez l'authentification mTLS, choisissez le protocole CLIENT_CERTIFICATE_TLS_AUTH.
-
-
Pour l'authentification SASL/SCRAM ou mTLS, choisissez le nom de la clé secrète Secrets Manager contenant les informations d'identification de votre cluster Kafka.
-
-
(Facultatif) Pour Encryption (Chiffrement), choisissez le secret Secrets Manager contenant le certificat d'autorité de certification racine que vos courtiers Kafka utilisent pour le chiffrement TLS, si vos courtiers Kafka utilisent des certificats signés par une autorité de certification privée.
Ce paramètre s'applique au chiffrement TLS pour SASL/SCRAM ou SASL/PLAIN, ainsi qu'à l'authentification mTLS.
-
Pour créer le déclencheur dans un état désactivé pour le test (recommandé), désactivez Enable trigger (Activer le déclencheur). Ou, pour activer le déclencheur immédiatement, sélectionnezActiver un déclencheur.
-
-
Pour créer le déclencheur, choisissez Add (Ajouter).
Ajout d'un cluster Kafka autogéré (AWS CLI)
Utilisez les exemples de commandes AWS CLI suivants afin de créer et d'afficher un déclencheur Apache Kafka autogéré pour votre fonction Lambda.
Utilisation de SASL/SCRAM
Si des utilisateurs de Kafka accèdent à vos courtiers Kafka via Internet, spécifiez le secret Secrets Manager que vous avez créé pour l'authentification SASL/SCRAM. L'exemple suivant utilise la commande de l' create-event-source-mapping
AWS CLI pour mapper une fonction Lambda nommée my-kafka-function
à une rubrique Kafka nommée AWSKafkaTopic
.
aws lambda create-event-source-mapping --topics
AWSKafkaTopic
--source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:01234567890
:secret:MyBrokerSecretName
--function-name arn:aws:lambda:us-east-1:01234567890
:function:my-kafka-function
--self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Pour plus d'informations, consultez la documentation de référence d'API CreateEventSourceMapping.
Utilisation d'un VPC
Si seuls des utilisateurs de Kafka au sein de votre VPC accèdent à vos agents Kafka, vous devez spécifier votre VPC, vos sous-réseaux et votre groupe de sécurité de VPC. L'exemple suivant utilise la commande de l' create-event-source-mapping
AWS CLI pour mapper une fonction Lambda nommée my-kafka-function
à une rubrique Kafka nommée AWSKafkaTopic
.
aws lambda create-event-source-mapping --topics
AWSKafkaTopic
--source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100
"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200
"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789
"}]' --function-name arn:aws:lambda:us-east-1:01234567890
:function:my-kafka-function
--self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Pour plus d'informations, consultez la documentation de référence d'API CreateEventSourceMapping.
Affichage du statut par le biais de l AWS CLI
L'exemple suivant utilise la commande de l' get-event-source-mapping
AWS CLI pour décrire l'état du mappage de source d'événement que vous avez créé.
aws lambda get-event-source-mapping --uuid
dh38738e-992b-343a-1077-3478934hjkfd7
Utilisation d'un cluster Kafka en tant que source d'événement
Lorsque vous ajoutez votre cluster Apache Kafka comme déclencheur pour votre fonction Lambda, le cluster est utilisé comme source d'événement.
Lambda lit les données d'événement à partir des rubriques Kafka que vous spécifiez comme Topics
dans une requête CreateEventSourceMapping, en fonction de la StartingPosition
que vous spécifiez. Lorsque le traitement a réussi, votre rubrique Kafka est validée dans votre cluster Kafka.
Si vous spécifiez StartingPosition
comme LATEST
, Lambda commence à lire à partir du dernier message dans chaque partition de la rubrique. Un certain temps pouvant s'écouler après la configuration du déclencheur avant que Lambda commence à lire les messages, Lambda ne lit aucun message produit pendant cette fenêtre de temps.
Lambda traite les registres d'une ou plusieurs partitions de rubrique Kafka que vous spécifiez et envoie une charge utile JSON à votre fonction Lambda. Lorsque des registres supplémentaires sont disponibles, Lambda continue de les traiter par lots, en fonction de la valeur BatchSize
que vous spécifiez dans une requête CreateEventSourceMapping, jusqu'à ce que la fonction rattrape la rubrique.
Si votre fonction renvoie une erreur pour l'un des messages d'un lot, Lambda réessaie le lot de messages complet jusqu'à ce que le traitement réussisse ou que les messages expirent.
Note
Alors que les fonctions Lambda ont généralement un délai d'expiration maximal de 15 minutes, les mappages de sources d'événement pour Amazon MSK, Apache Kafka autogéré et Amazon MQ pour ActiveMQ et RabbitMQ ne prennent en charge que les fonctions dont le délai d'expiration maximal est de 14 minutes. Cette contrainte garantit que le mappage de sources d'événement peut gérer correctement les erreurs de fonction et effectuer de nouvelles tentatives.
Mise à l'échelle automatique de la source d'événement Kafka
Lorsque vous créez initialement une source d'événement Apache Kafka, Lambda alloue un consommateur pour traiter toutes les partitions de la rubrique Kafka. Chaque consommateur dispose de plusieurs processeurs exécutés en parallèle pour gérer des charges de travail accrues. Lambda augmente ou diminue automatiquement le nombre de consommateurs, en fonction de la charge de travail. Pour préserver l'ordre des messages dans chaque partition, le nombre maximum de consommateurs est de un par partition dans la rubrique.
Toutes les minutes, Lambda évalue le décalage de consommateurs de toutes les partitions dans la rubrique. Si le décalage est trop élevé, la partition reçoit des messages plus rapidement que Lambda ne peut les traiter. Si nécessaire, Lambda ajoute ou supprime des consommateurs dans la rubrique. Le processus de mise à l'échelle consistant à ajouter ou à supprimer des consommateurs a lieu dans les trois minutes suivant l'évaluation.
Si votre fonction Lambda cible est surchargée, Lambda réduit le nombre de consommateurs. Cette action réduit la charge de travail de la fonction en diminuant le nombre de messages que les consommateurs peuvent échanger avec la fonction.
Pour surveiller le débit de votre rubrique Kafka, vous pouvez afficher les métriques de consommateurs Apache Kafka, telles que consumer_lag
et consumer_offset
. Pour vérifier le nombre d'appels de fonctions qui se produisent en parallèle, vous pouvez également surveiller les métriques de simultanéité pour votre fonction.
Opérations d'API de source d'événement
Lorsque vous ajoutez votre cluster Kafka en tant que source d'événement pour votre fonction Lambda à l'aide de la console Lambda, d'un kit SDK AWS ou de l'AWS CLI, Lambda utilise des API pour traiter votre demande.
Pour gérer une source d’événement à l’aide de la AWS Command Line Interface (AWS CLI) ou d’un AWS SDK
API de mappage de la source d'événement
Lorsque vous ajoutez votre cluster Apache Kafka en tant que source d'événement pour votre fonction Lambda, si votre fonction rencontre une erreur, votre consommateur Kafka cesse de traiter les registres. Les consommateurs d'une partition de rubrique sont ceux qui s'abonnent à vos registres et qui les lisent et traitent. Vos autres consommateurs Kafka peuvent continuer à traiter les registres, à condition qu'ils ne rencontrent pas la même erreur.
Afin d'identifier les raisons pour lesquelles un consommateur cesse son traitement, vérifiez le champ StateTransitionReason
dans la réponse de EventSourceMapping
. La liste suivante décrit les erreurs de source d'événement que vous pouvez recevoir :
ESM_CONFIG_NOT_VALID
-
La configuration de mappage de source d'événement n'est pas valide.
EVENT_SOURCE_AUTHN_ERROR
-
Lambda n'a pas pu authentifier la source d'événement.
EVENT_SOURCE_AUTHZ_ERROR
-
Lambda ne dispose pas des autorisations requises pour accéder à la source d'événement.
FUNCTION_CONFIG_NOT_VALID
-
La configuration de la fonction n'est pas valide.
Note
Si vos registres d'événement Lambda dépassent la limite de taille autorisée de 6 Mo, il se peut qu'ils ne soient pas traités.
Métriques Amazon CloudWatch
Lambda émet la métrique OffsetLag
pendant que votre fonction traite les registres. La valeur de cette métrique est la différence de décalage entre le dernier enregistrement inscrit dans la rubrique source de l'événement Kafka et le dernier enregistrement traité par le groupe de consommateurs de votre fonction. Vous pouvez utiliser OffsetLag
pour estimer la latence entre le moment où un enregistrement est ajouté et celui où votre groupe de consommateurs le traite.
Une tendance à la hausse de OffsetLag
peut indiquer des problèmes liés aux sondeurs dans le groupe de consommateurs de votre fonction. Pour de plus amples informations, veuillez consulter Utilisation des métriques de fonction Lambda.
Paramètres de configuration Apache Kafka autogérés
Tous les types de sources d'événement Lambda partagent les mêmes opérations d'API CreateEventSourceMapping et UpdateEventSourceMapping. Cependant, seuls certains paramètres s'appliquent à Apache Kafka.
Paramètres de source d'événement qui s'appliquent à Apache Kafka autogéré | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Paramètre | Obligatoire | Par défaut | Remarques | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BatchSize |
N |
100 |
Maximum : 10 000. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Activé |
N |
Activé |
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FunctionName |
Y |
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FilterCriteria |
N |
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumBatchingWindowInSeconds |
N |
500 ms |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SelfManagedEventSource |
O |
Liste des agents Kafka. Peut définir uniquement sur Create (Créer) |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SelfManagedKafkaEventSourceConfig |
N |
Contient le champ ConsumerGroupId dont la valeur par défaut est unique. |
Peut définir uniquement sur Create (Créer) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SourceAccessConfigurations |
N |
Pas d'informations d'identification |
Informations sur le VPC ou informations d'authentification pour le cluster Pour SASL_PLAIN, définissez la valeur BASIC_AUTH |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StartingPosition |
O |
|
TRIM_HORIZON ou LATEST Peut définir uniquement sur Create (Créer) |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Rubriques |
O |
|
Nom de la rubrique Peut définir uniquement sur Create (Créer) |