Utilisation de Lambda avec Apache Kafka autogéré - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de Lambda avec Apache Kafka autogéré

Note

Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.

Lambda prend en charge Apache Kafka en tant que source d’événement. Apache Kafka est une plateforme open source de streaming d’événements qui prend en charge des charges de travail telles que les pipelines de données et l’analytique de streaming.

Vous pouvez utiliser le service Kafka AWS géré Amazon Managed Streaming for Apache Kafka (Amazon MSK) ou un cluster Kafka autogéré. Pour plus de détails sur l’utilisation de Lambda avec Amazon MSK, consultez Utilisation de Lambda avec Amazon MSK.

Cette rubrique décrit comment utiliser Lambda avec un cluster Kafka autogéré. En AWS termes terminologiques, un cluster autogéré inclut les clusters Kafka non AWS hébergés. Par exemple, vous pouvez héberger votre cluster Kafka avec un fournisseur de cloud tel que Confluent Cloud.

Apache Kafka en tant que source d’événement fonctionne de la même manière qu’Amazon Simple Queue Service (Amazon SQS) ou Amazon Kinesis. Lambda interroge en interne les nouveaux messages de la source d’événement, puis invoque de manière synchrone la fonction Lambda cible. Lambda lit les messages par lot et les fournit à votre fonction en tant que charge utile d’événement. La taille de lot maximale est configurable. (par défaut, 100 messages).

Avertissement

Les mappages de sources d'événements Lambda traitent chaque événement au moins une fois et le traitement par lots peut se produire en double. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente dans le Knowledge Center. AWS

Pour les sources d’événements basées sur Kafka, Lambda prend en charge les paramètres de contrôle du traitement par lots, tels que les fenêtres de traitement par lots et la taille des lots. Pour plus d'informations, consultez Comportement de traitement par lots.

Pour un exemple d'utilisation de Kafka autogéré comme source d'événements, consultez la section Utilisation d'Apache Kafka auto-hébergée comme source d'événements AWS Lambda sur le blog Compute. AWS

Exemple d’évènement

Lambda envoie le lot de messages dans le paramètre d’événement quand il invoque votre fonction Lambda. La charge utile d’un événement contient un tableau de messages. Chaque élément de tableau contient des détails de la rubrique Kafka et l’identifiant de partition Kafka, ainsi qu’un horodatage et un message codé en base 64.

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Authentification de cluster Kafka

Lambda prend en charge plusieurs méthodes d’authentification auprès de votre cluster Apache Kafka autogéré. Veillez à configurer le cluster Kafka pour utiliser l’une des méthodes d’authentification prises en charge suivantes : Pour de plus amples informations sur la sécurité, veuillez consulter la section Sécurité de la documentation Kafka.

Accès VPC

Si seuls des utilisateurs de Kafka au sein de votre VPC accèdent à vos courtiers Kafka, vous devez configurer la source d’événement avec un accès Amazon Virtual Private Cloud (Amazon VPC).

Authentification SASL/SCRAM

Lambda prend en charge l’authentification SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism) avec chiffrement du protocole TLS (Transport Layer Security) (SASL_SSL). Lambda envoie les informations d’identification chiffrées pour s’authentifier auprès du cluster. Lambda ne prend pas en charge SASL/SCRAM avec du texte en clair (SASL_PLAINTEXT). Pour plus d’informations sur l’authentification SASL/SCRAM, consultez RFC 5802.

Lambda prend également en charge l’authentification SASL/PLAIN. Comme ce mécanisme utilise des informations d’identification en texte clair, la connexion au serveur doit utiliser le chiffrement TLS pour garantir la protection des informations d’identification.

Pour l’authentification SASL, vous devez stocker les informations d’identification en tant que secret dans AWS Secrets Manager. Pour plus d’informations sur l’utilisation de Secrets Manager, consultez Didacticiel : création et récupération d’un secret dans le Guide de l’utilisateur AWS Secrets Manager .

Important

Pour utiliser Secrets Manager pour l'authentification, les secrets doivent être stockés dans la même AWS région que votre fonction Lambda.

Authentification TLS mutuelle

Mutual TLS (mTLS) fournit une authentification bidirectionnelle entre le client et le serveur. Le client envoie un certificat au serveur pour que le serveur vérifie le client, et le serveur envoie un certificat au client pour que le client vérifie le serveur.

Dans Apache Kafka autogéré, Lambda agit en tant que client. Vous configurez un certificat client (en tant que secret dans Secrets Manager) pour authentifier Lambda auprès des courtiers Kafka. Le certificat client doit être signé par une autorité de certification dans le magasin d’approbations du serveur.

Le cluster Kafka envoie un certificat de serveur à Lambda pour authentifier les courtiers auprès de Lambda. Le certificat de serveur peut être un certificat d’autorité de certification public ou un certificat CA/auto-signé privé. Le certificat d’autorité de certification publique doit être signé par une autorité de certification située du magasin d’approbations Lambda. Pour un certificat CA/auto-signé privé, vous configurez le certificat d’autorité de certification racine du serveur (en tant que secret dans Secrets Manager). Lambda utilise le certificat racine pour vérifier les courtiers Kafka.

Pour de plus amples informations sur mTLS, veuillez consulter Introduction d’une authentification TLS mutuelle pour Amazon MSK en tant que source d’événement.

Configuration du secret du certificat client

Le secret CLIENT_CERTIFICATE_TLS_AUTH nécessite un champ de certificat et un champ de clé privée. Pour une clé privée chiffrée, le secret nécessite un mot de passe de clé privée. Le certificat et la clé privée doivent être au format PEM.

Note

Lambda prend en charge les algorithmes de chiffrement de clés privées PBES1 (mais pas PBES2).

Le champ de certificat doit contenir une liste de certificats, commençant par le certificat client, suivi de tous les certificats intermédiaires et se terminant par le certificat racine. Chaque certificat doit commencer sur une nouvelle ligne avec la structure suivante :

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager prend en charge les secrets jusqu’à 65 536 octets, ce qui offre suffisamment d’espace pour de longues chaînes de certificats.

La clé privée doit être au format PKCS #8, avec la structure suivante :

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Pour une clé privée chiffrée, utilisez la structure suivante :

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

L’exemple suivant affiche le contenu d’un secret pour l’authentification mTLS à l’aide d’une clé privée chiffrée. Pour une clé privée chiffrée, incluez le mot de passe de clé privée dans le secret.

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Configuration du secret du certificat d’autorité de certification racine du serveur

Vous créez ce secret si vos courtiers Kafka utilisent le chiffrement TLS avec des certificats signés par une autorité de certification privée. Vous pouvez utiliser le chiffrement TLS pour l’authentification VPC, SASL/SCRAM, SASL/PLAIN ou mTLS.

Le secret du certificat d’autorité de certification racine du serveur nécessite un champ contenant le certificat d’autorité de certification racine du courtier Kafka au format PEM. La structure du secret est présentée dans l’exemple suivant.

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

Gestion des accès et des autorisations d’API

En plus d’accéder au cluster Amazon Kafka autogéré, votre fonction Lambda a besoin d’autorisations pour effectuer diverses actions d’API. Ajoutez ces autorisations au rôle d’exécution de votre fonction. Si vos utilisateurs ont besoin d'accéder à des actions d'API, ajoutez les autorisations requises à la politique d'identité de l'utilisateur ou du rôle AWS Identity and Access Management (IAM).

Autorisations de fonction Lambda requises

Pour créer et stocker des journaux dans un groupe de CloudWatch journaux dans Amazon Logs, votre fonction Lambda doit disposer des autorisations suivantes dans son rôle d'exécution :

Autorisations de fonction Lambda facultatives

Votre fonction Lambda peut également nécessiter ces autorisations pour :

  • Décrivez votre secret Secrets Manager.

  • Accédez à votre AWS Key Management Service (AWS KMS) clé gérée par le client.

  • Accédez à votre Amazon VPC.

  • Envoyez les enregistrements des invocations ayant échoué vers une destination.

Secrets Manager et AWS KMS autorisations

Selon le type de contrôle d'accès que vous configurez pour vos courtiers Kafka, votre fonction Lambda peut avoir besoin d'une autorisation pour accéder à votre secret Secrets Manager ou pour déchiffrer AWS KMS votre clé gérée par le client. Afin d’accéder à ces ressources, le rôle d’exécution de votre fonction doit disposer des autorisations suivantes :

Autorisations VPC

Si seuls des utilisateurs au sein d’un VPC peuvent accéder à votre cluster Apache Kafka autogéré, votre fonction Lambda doit être autorisée à accéder à vos ressources Amazon VPC. Ces ressources incluent les sous-réseaux, groupes de sécurité et interfaces réseau de votre VPC. Afin d’accéder à ces ressources, le rôle d’exécution de votre fonction doit disposer des autorisations suivantes :

Envoi d’enregistrements vers une destination

Si vous souhaitez envoyer des enregistrements d'invocation ayant échoué vers une destination en cas d’échec, votre fonction Lambda doit être autorisée à envoyer ces enregistrements. Pour les mappages de sources d'événements Kafka, vous pouvez choisir entre une rubrique Amazon SNS, une file d'attente Amazon SQS ou un compartiment Amazon S3 comme destination. Pour envoyer des enregistrements à une rubrique SNS, le rôle d’exécution de votre fonction doit avoir les autorisations suivantes :

Pour envoyer des enregistrements à une rubrique SQS, le rôle d’exécution de votre fonction doit avoir les autorisations suivantes :

Pour envoyer des enregistrements à un compartiment S3, le rôle d’exécution de votre fonction doit avoir les autorisations suivantes :

En outre, si vous avez configuré une clé KMS sur votre destination, Lambda a besoin des autorisations suivantes en fonction du type de destination :

Ajout d’autorisations à votre rôle d’exécution

Pour accéder aux autres AWS services utilisés par votre cluster Apache Kafka autogéré, Lambda utilise les politiques d'autorisation que vous définissez dans le rôle d'exécution de votre fonction Lambda.

Par défaut, Lambda n’est pas autorisé à exécuter les actions obligatoires ou facultatives pour un cluster Apache Kafka autogéré. Vous devez créer et définir ces actions dans une stratégie d’approbation IAM, puis attacher celle-ci à votre rôle d’exécution. Cet exemple montre comment créer une stratégie autorisant Lambda à accéder à vos ressources Amazon VPC.

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

Pour plus d’informations sur la création d’un document de stratégie JSON dans la console IAM, consultez Création de stratégies sous l’onglet JSON dans le Guide de l’utilisateur IAM.

Octroi d’accès à des utilisateurs avec une politique IAM

Par défaut, les utilisateurs et les rôles n’ont pas l’autorisation d’effectuer des opérations d’API de source d’événement. Pour accorder l’accès aux utilisateurs de votre organisation ou de votre compte, vous pouvez ajouter ou mettre à jour une stratégie basée sur l’identité. Pour plus d'informations, consultez la section Contrôle de l'accès aux AWS ressources à l'aide de politiques dans le Guide de l'utilisateur IAM.

Authentification et erreurs d’autorisation

Si l'une des autorisations requises pour consommer les données du cluster Kafka est manquante, Lambda affiche l'un des messages d'erreur suivants dans le mappage des sources d'événements ci-dessous. LastProcessingResult

Le cluster n’a pas pu autoriser Lambda

Pour SASL/SCRAM ou mTLS, cette erreur indique que l’utilisateur fourni ne dispose pas de toutes les autorisations de liste de contrôle d’accès (ACL) Kafka requises suivantes :

  • DescribeConfigs Cluster

  • Décrire un groupe

  • Lire le groupe

  • Décrire la rubrique

  • Lire la rubrique

Lorsque vous créez des listes ACL Kafka avec les autorisations kafka-cluster requises, vous devez spécifier la rubrique et le groupe en tant que ressources. Le nom de la rubrique doit correspondre à la rubrique dans le mappage des sources d’événements. Le nom du groupe doit correspondre à l’UUID du mappage des sources d’événements.

Une fois que vous avez ajouté les autorisations requises au rôle d’exécution, il peut y avoir un délai de plusieurs minutes avant que les modifications ne prennent effet.

Échec de l’authentification SASL

Pour SASL/SCRAM ou SASL/PLAIN, cette erreur indique que les informations d’identification fournies ne sont pas valides.

Le serveur n’a pas réussi à authentifier Lambda

Cette erreur indique que l’agent Kafka n’a pas réussi à authentifier Lambda. Cette erreur se produit dans les conditions suivantes :

  • Vous n’avez pas fourni de certificat client pour l’authentification mTLS.

  • Vous avez fourni un certificat client, mais les courtiers ne sont pas configurés pour utiliser l’authentification mTLS.

  • Les courtiers Kafka ne font pas confiance à un certificat client.

Lambda n’a pas réussi à authentifier le serveur

Cette erreur indique que Lambda n’a pas réussi à authentifier l’agent Kafka. Cette erreur se produit dans les conditions suivantes :

  • Les courtiers Kafka utilisent des certificats auto-signés ou une autorité de certification privée, mais n’ont pas fourni le certificat d’autorité de certification racine du serveur.

  • Le certificat d’autorité de certification racine du serveur ne correspond pas à l’autorité de certification racine qui a signé le certificat du courtier.

  • La validation du nom d’hôte a échoué, car le certificat du courtier ne contient pas le nom DNS ou l’adresse IP du courtier comme autre nom de sujet.

Le certificat ou la clé privée fournis n’est pas valide

Cette erreur indique que le consommateur Kafka n’a pas pu utiliser le certificat ou la clé privée fournis. Assurez-vous que le certificat et la clé utilisent le format PEM et que le chiffrement de la clé privée utilise un algorithme PBES1.

Configuration réseau

Si vous configurez l’accès d’Amazon VPC à vos courtiers Kafka, Lambda doit avoir accès aux ressources Amazon VPC associées dans lesquelles réside votre cluster Kafka. Nous vous recommandons de déployer des points de terminaison AWS PrivateLink VPC pour Lambda et (). AWS Security Token Service AWS STS Si l’agent utilise l’authentification, déployez également un point de terminaison de VPC pour Secrets Manager. Si vous avez configuré une destination en cas d’échec, déployez également un point de terminaison de VPC pour le service de destination.

Vous pouvez aussi vous assurer que le VPC associé à votre cluster Kafka inclut une passerelle NAT par sous-réseau public. Pour plus d’informations, consultez Accès à Internet et aux services pour des fonctions connectées à un VPC.

Si vous utilisez des points de terminaison VPC, vous devez également les configurer pour activer les noms DNS privés.

Vous devez configurer vos groupes de sécurité Amazon VPC avec les règles suivantes (au minimum) :

  • Règles entrantes – Autorisent tout le trafic sur le port de l’agent Kafka pour le groupe de sécurité spécifié en tant que source d’évènement. Kafka utilise le port 9092 par défaut.

  • Règles sortantes : autorisent tout le trafic sur le port 443 pour toutes les destinations. Autorisent tout le trafic sur le port de l’agent Kafka pour le groupe de sécurité spécifié en tant que source d’évènement. Kafka utilise le port 9092 par défaut.

  • Si vous utilisez des points de terminaison d’un VPC au lieu de la passerelle NAT, les groupes de sécurité associés aux points de terminaison du VPC doivent autoriser tout le trafic entrant sur le port 443 à partir des groupes de sécurité des sources d’événement.

Pour plus d'informations sur la configuration du réseau, consultez Configuration AWS Lambda avec un cluster Apache Kafka au sein d'un VPC sur AWS le blog Compute.

Ajout d’un cluster Kafka en tant que source d’événement

Pour créer un mappage de source d’événement, ajoutez votre cluster Kafka en tant que déclencheur de fonction Lambda à l’aide de la console Lambda, d’un kit SDK AWS, ou de l’AWS Command Line Interface (AWS CLI).

Cette section explique comment créer un mappage des sources d’événements à l’aide de la console Lambda et de l’ AWS CLI.

Prérequis

  • Cluster Apache Kafka autogéré. Lambda prend en charge Apache Kafka versions 0.10.1.0 et ultérieures.

  • Rôle d'exécution autorisé à accéder aux AWS ressources utilisées par votre cluster Kafka autogéré.

Identifiant de groupe de consommateurs personnalisable

Lorsque vous configurez Kafka comme source d’événements, vous pouvez spécifier un identifiant de groupe de consommateurs. Cet identifiant de groupe de consommateurs est un identifiant existant pour le groupe de clients Kafka auquel vous souhaitez rattacher votre fonction Lambda. Vous pouvez utiliser cette fonction pour migrer facilement toutes les configurations de traitement d’enregistrements Kafka en cours depuis d’autres clients vers Lambda.

Si vous spécifiez un identifiant de groupe de consommateurs et qu’il existe d’autres sondeurs actifs au sein de ce groupe de consommateurs, Kafka distribue des messages à tous les consommateurs. En d’autres termes, Lambda ne reçoit pas l’intégralité du message relatif au sujet Kafka. Si vous souhaitez que Lambda gère tous les messages de la rubrique, désactivez tous les autres sondeurs de ce groupe de consommateurs.

De plus, si vous spécifiez un identifiant de groupe de consommateurs et que Kafka trouve un groupe de consommateurs existant valide avec le même identifiant, Lambda ignore le paramètre StartingPosition pour le mappage des sources d’événements. Lambda commence plutôt à traiter les enregistrements en fonction de la compensation engagée par le groupe de consommateurs. Si vous spécifiez un identifiant de groupe de consommateurs et que Kafka ne trouve aucun groupe de consommateurs existant, Lambda configure votre source d’événement avec le StartingPosition spécifié.

L’identifiant du groupe de consommateurs que vous spécifiez doit être unique parmi toutes vos sources d’événements Kafka. Après avoir créé un mappage des sources d’événements Kafka avec l’identifiant de groupe de consommateurs spécifié, vous ne pouvez plus mettre à jour cette valeur.

Destinations en cas d’échec

Pour retenir les enregistrements des invocations échouées ou des charges utiles surdimensionnées provenant de votre source d’événements Kafka, configurez une destination en cas d’échec pour votre fonction. Quand une invocation échoue, Lambda envoie à votre destination un enregistrement JSON avec des détails sur l’invocation.

Vous pouvez choisir entre une rubrique Amazon SNS, une file d’attente Amazon SQS ou un compartiment Amazon S3 comme destination. Pour les destinations de rubriques SNS ou de files d’attente SQS, Lambda envoie les métadonnées d’enregistrement à la destination. Pour les destinations de compartiment S3, Lambda envoie l’intégralité de l’enregistrement d’invocation ainsi que les métadonnées à la destination.

Pour que Lambda envoie correctement les enregistrements vers la destination que vous avez choisie, assurez-vous que le rôle d’exécution de votre fonction contient les autorisations appropriées. Le tableau décrit également comment chaque type de destination reçoit l’enregistrement d’invocation JSON.

Type de destination Pris en charge pour les sources d’événements suivantes Autorisations nécessaires Format JSON spécifique à la destination

File d’attente Amazon SQS

  • Kinesis

  • DynamoDB

  • Apache Kafka autogéré et Apache Kafka géré

Lambda transmet les métadonnées d’enregistrement d’invocation en tant que Message à la destination.

Rubrique Amazon SNS

  • Kinesis

  • DynamoDB

  • Apache Kafka autogéré et Apache Kafka géré

Lambda transmet les métadonnées d’enregistrement d’invocation en tant que Message à la destination.

Compartiment Amazon S3

  • Apache Kafka autogéré et Apache Kafka géré

Lambda stocke l’enregistrement d’invocation ainsi que ses métadonnées à la destination.

Astuce

La bonne pratique consiste à n’inclure les autorisations minimales requises que dans votre rôle d’exécution.

Destinations SNS et SQS

L’exemple suivant montre ce que Lambda envoie à une rubrique SNS ou à une destination de file d’attente SQS pour une invocation de source d’événement Kafka qui a échoué. Chacune des clés sous recordsInfo contient à la fois le sujet et la partition Kafka, séparés par un trait d’union. Par exemple, pour la clé"Topic-0", Topic est la rubrique Kafka, et 0 est la partition. Pour chaque sujet et chaque partition, vous pouvez utiliser les décalages et les données d’horodatage pour trouver les enregistrements d’invocation d’origine.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Destinations S3

Pour les destinations S3, Lambda envoie l’intégralité de l’enregistrement d’invocation ainsi que les métadonnées à la destination. L’exemple suivant montre que Lambda envoie vers une destination de compartiment S3 en cas d’échec d’une invocation de source d’événement Kafka. Outre tous les champs de l’exemple précédent pour les destinations SQS et SNS, le champ payload contient l’enregistrement d’invocation d’origine sous forme de chaîne JSON échappée.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Astuce

Nous vous recommandons d’activer la gestion des versions S3 sur votre compartiment de destination.

Configuration des destinations en cas de panne

Pour configurer une destination en cas de panne à l’aide de la console, procédez comme suit :

  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Invocation du mappage des sources d’événements.

  5. Pour le mappage des sources d’événements, choisissez une source d’événements configurée pour cette fonction.

  6. Pour Condition, sélectionnez En cas d’échec. Pour les invocations de mappage des sources d’événements, il s’agit de la seule condition acceptée.

  7. Pour Type de destination, choisissez le type de destination auquel Lambda envoie les enregistrements d’invocation.

  8. Pour Destination, choisissez une ressource.

  9. Choisissez Enregistrer.

Vous pouvez également configurer une destination en cas de panne à l’aide de l’API Lambda. Par exemple, la commande CreateEventSourceMappingCLI suivante ajoute une destination SQL en cas de défaillance à : MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

La commande UpdateEventSourceMappingCLI suivante ajoute une destination S3 en cas de défaillance à la source d'événements Kafka associée à l'entrée : uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Pour supprimer une destination, entrez une chaîne vide comme argument du paramètre destination-config :

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Ajout d’un cluster Kafka autogéré (console)

Pour ajouter votre cluster Apache Kafka autogéré et une rubrique Kafka en tant que déclencheur pour votre fonction Lambda, procédez comme suit.

Pour ajouter un déclencheur Apache Kafka à votre fonction Lambda (console)
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom de votre fonction Lambda.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).

  4. Sous Trigger configuration (Configuration du déclencheur), procédez comme suit :

    1. Choisissez le type de déclencheur Apache Kafka.

    2. Pour Bootstrap servers (Serveurs d’amorçage), entrez l’adresse de paire hôte et port d’un agent Kafka dans votre cluster, puis choisissez Add (Ajouter). Répétez l’opération pour chaque agent Kafka dans le cluster.

    3. Pour Topic name (Nom de rubrique), entrez le nom de la rubrique Kafka utilisée pour stocker les registres dans le cluster.

    4. (Facultatif) Pour Batch size (Taille de lot), entrez le nombre maximal de registres à recevoir dans un même lot.

    5. Pour Batch window, veuillez saisir l’intervalle de temps maximal en secondes nécessaire à Lambda pour collecter des enregistrements avant d’invoquer la fonction.

    6. (Facultatif) Pour l’identifiant de groupe de consommateurs, entrez l’identifiant d’un groupe de consommateurs Kafka à rejoindre.

    7. (Facultatif) Pour Position de départ, choisissez Dernier pour commencer à lire le flux à partir du dernier enregistrement, Supprimer l’horizon pour commencer au premier enregistrement disponible ou À l’horodatage pour spécifier un horodatage à partir duquel commencer la lecture.

    8. (Facultatif) Pour VPC, choisissez l’Amazon VPC pour votre cluster Kafka. Ensuite, choisissez le VPC subnets (Sous-réseaux VPC) et les VPC security groups (Groupes de sécurité VPC).

      Ce paramètre est requis si seuls des utilisateurs au sein de votre VPC accèdent à vos courtiers.

    9. (Facultatif) Pour Authentication (Authentification), choisissez Add (Ajouter), puis procédez comme suit :

      1. Choisissez le protocole d’accès ou d’authentification des courtiers Kafka dans votre cluster.

        • Si votre agent Kafka utilise l’authentification SASL/PLAIN, choisissez BASIC_AUTH.

        • Si votre agent utilise une authentification SASL/SCRAM, choisissez l’un des protocoles SASL_SCRAM.

        • Si vous configurez l’authentification mTLS, choisissez le protocole CLIENT_CERTIFICATE_TLS_AUTH.

      2. Pour l’authentification SASL/SCRAM ou mTLS, choisissez le nom de la clé secrète Secrets Manager contenant les informations d’identification de votre cluster Kafka.

    10. (Facultatif) Pour Encryption (Chiffrement), choisissez le secret Secrets Manager contenant le certificat d’autorité de certification racine que vos courtiers Kafka utilisent pour le chiffrement TLS, si vos courtiers Kafka utilisent des certificats signés par une autorité de certification privée.

      Ce paramètre s’applique au chiffrement TLS pour SASL/SCRAM ou SASL/PLAIN, ainsi qu’à l’authentification mTLS.

    11. Pour créer le déclencheur dans un état désactivé pour le test (recommandé), désactivez Enable trigger (Activer le déclencheur). Ou, pour activer le déclencheur immédiatement, sélectionnezActiver un déclencheur.

  5. Pour créer le déclencheur, choisissez Add (Ajouter).

Ajout d’un cluster Kafka autogéré (AWS CLI)

Utilisez les exemples de AWS CLI commandes suivants pour créer et afficher un déclencheur Apache Kafka autogéré pour votre fonction Lambda.

Utilisation de SASL/SCRAM

Si des utilisateurs de Kafka accèdent à vos courtiers Kafka via Internet, spécifiez le secret Secrets Manager que vous avez créé pour l’authentification SASL/SCRAM. L'exemple suivant utilise la create-event-source-mapping AWS CLI commande pour mapper une fonction Lambda nommée my-kafka-function à une rubrique Kafka nommée. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Utilisation d’un VPC

Si seuls des utilisateurs de Kafka au sein de votre VPC accèdent à vos agents Kafka, vous devez spécifier votre VPC, vos sous-réseaux et votre groupe de sécurité de VPC. L'exemple suivant utilise la create-event-source-mapping AWS CLI commande pour mapper une fonction Lambda nommée my-kafka-function à une rubrique Kafka nommée. AWSKafkaTopic

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Affichage de l'état à l'aide du AWS CLI

L'exemple suivant utilise la get-event-source-mapping AWS CLI commande pour décrire l'état du mappage des sources d'événements que vous avez créé.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Utilisation d’un cluster Kafka en tant que source d’événement

Lorsque vous ajoutez votre cluster Apache Kafka comme déclencheur pour votre fonction Lambda, le cluster est utilisé comme source d’événement.

Lambda lit les données d'événements des sujets Kafka que vous spécifiez Topics dans une CreateEventSourceMappingdemande, en fonction de StartingPosition ce que vous spécifiez. Lorsque le traitement a réussi, votre rubrique Kafka est validée dans votre cluster Kafka.

Si vous spécifiez StartingPosition comme LATEST, Lambda commence à lire à partir du dernier message dans chaque partition de la rubrique. Un certain temps pouvant s’écouler après la configuration du déclencheur avant que Lambda commence à lire les messages, Lambda ne lit aucun message produit pendant cette fenêtre de temps.

Lambda traite les registres d’une ou plusieurs partitions de rubrique Kafka que vous spécifiez et envoie une charge utile JSON à votre fonction Lambda. Lorsque d'autres enregistrements sont disponibles, Lambda continue de traiter les enregistrements par lots, en fonction de la BatchSize valeur que vous spécifiez dans une CreateEventSourceMappingdemande, jusqu'à ce que votre fonction aborde le sujet.

Si votre fonction renvoie une erreur pour l’un des messages d’un lot, Lambda réessaie le lot de messages complet jusqu’à ce que le traitement réussisse ou que les messages expirent. Vous pouvez envoyer les enregistrements qui échouent à toutes les tentatives vers une destination en cas d'échec pour un traitement ultérieur.

Note

Alors que les fonctions Lambda ont généralement un délai d’expiration maximal de 15 minutes, les mappages des sources d’événement pour Amazon MSK, Apache Kafka autogéré, Amazon DocumentDB et Amazon MQ pour ActiveMQ et RabbitMQ ne prennent en charge que les fonctions dont le délai d’expiration maximal est de 14 minutes. Cette contrainte garantit que le mappage des sources d’événements peut gérer correctement les erreurs de fonction et effectuer de nouvelles tentatives.

Positions de départ des interrogations et des flux

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON ou AT_TIMESTAMP.

Mise à l’échelle automatique de la source d’événement Kafka

Lorsque vous créez initialement une source d’événement Apache Kafka, Lambda alloue un consommateur pour traiter toutes les partitions de la rubrique Kafka. Chaque consommateur dispose de plusieurs processeurs exécutés en parallèle pour gérer des charges de travail accrues. Lambda augmente ou diminue automatiquement le nombre de consommateurs, en fonction de la charge de travail. Pour préserver l’ordre des messages dans chaque partition, le nombre maximum de consommateurs est de un par partition dans la rubrique.

Toutes les minutes, Lambda évalue le décalage de consommateurs de toutes les partitions dans la rubrique. Si le décalage est trop élevé, la partition reçoit des messages plus rapidement que Lambda ne peut les traiter. Si nécessaire, Lambda ajoute ou supprime des consommateurs dans la rubrique. Le processus de mise à l’échelle consistant à ajouter ou à supprimer des consommateurs a lieu dans les trois minutes suivant l’évaluation.

Si votre fonction Lambda cible est surchargée, Lambda réduit le nombre de consommateurs. Cette action réduit la charge de travail de la fonction en diminuant le nombre de messages que les consommateurs peuvent échanger avec la fonction.

Pour surveiller le débit de votre rubrique Kafka, vous pouvez afficher les métriques de consommateurs Apache Kafka, telles que consumer_lag et consumer_offset. Pour vérifier le nombre d’invocations de fonctions qui se produisent en parallèle, vous pouvez également surveiller les métriques de simultanéité pour votre fonction.

Opérations d’API de source d’événement

Lorsque vous ajoutez votre cluster Kafka comme source d'événements pour votre fonction Lambda à l'aide de la console Lambda, AWS d'un SDK ou du AWS CLI, Lambda utilise des API pour traiter votre demande.

Pour gérer une source d’événement à l’aide de la AWS Command Line Interface (AWS CLI) ou d’un AWS SDK, vous pouvez utiliser les opérations d’API suivantes :

API de mappage des sources d’événements

Lorsque vous ajoutez votre cluster Apache Kafka en tant que source d’événement pour votre fonction Lambda, si votre fonction rencontre une erreur, votre consommateur Kafka cesse de traiter les registres. Les consommateurs d’une partition de rubrique sont ceux qui s’abonnent à vos registres et qui les lisent et traitent. Vos autres consommateurs Kafka peuvent continuer à traiter les registres, à condition qu’ils ne rencontrent pas la même erreur.

Afin d’identifier les raisons pour lesquelles un consommateur cesse son traitement, vérifiez le champ StateTransitionReason dans la réponse de EventSourceMapping. La liste suivante décrit les erreurs de source d’événement que vous pouvez recevoir :

ESM_CONFIG_NOT_VALID

La configuration de mappage de source d’événement n’est pas valide.

EVENT_SOURCE_AUTHN_ERROR

Lambda n’a pas pu authentifier la source d’événement.

EVENT_SOURCE_AUTHZ_ERROR

Lambda ne dispose pas des autorisations requises pour accéder à la source d’événement.

FUNCTION_CONFIG_NOT_VALID

La configuration de la fonction n’est pas valide.

Note

Si vos registres d’événement Lambda dépassent la limite de taille autorisée de 6 Mo, il se peut qu’ils ne soient pas traités.

CloudWatch Métriques Amazon

Lambda émet la métrique OffsetLag pendant que votre fonction traite les registres. La valeur de cette métrique est la différence de décalage entre le dernier enregistrement inscrit dans la rubrique source de l’événement Kafka et le dernier enregistrement traité par le groupe de consommateurs de votre fonction. Vous pouvez utiliser OffsetLag pour estimer la latence entre le moment où un enregistrement est ajouté et celui où votre groupe de consommateurs le traite.

Une tendance à la hausse de OffsetLag peut indiquer des problèmes liés aux sondeurs dans le groupe de consommateurs de votre fonction. Pour plus d’informations, consultez Utilisation des métriques de fonction Lambda.

Paramètres de configuration Apache Kafka autogérés

Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMappinget les mêmes opérations d'UpdateEventSourceMappingAPI. Cependant, seuls certains paramètres s’appliquent à Apache Kafka.

Paramètres de source d’événement qui s’appliquent à Apache Kafka autogéré
Paramètre Obligatoire Par défaut Remarques

BatchSize

N

100

Maximum : 10 000.

Activé

N

Activées

FunctionName

Y

FilterCriteria

N

Filtrage des événements Lambda

MaximumBatchingWindowInSeconds

N

500 ms

Comportement de traitement par lots

SelfManagedEventSource

Y

Liste des agents Kafka. Peut définir uniquement sur Create (Créer)

SelfManagedKafkaEventSourceConfig

N

Contient le ConsumerGroupId champ qui prend par défaut une valeur unique.

Peut définir uniquement sur Create (Créer)

SourceAccessConfigurations

N

Pas d’informations d’identification

Informations sur le VPC ou informations d’authentification pour le cluster

Pour SASL_PLAIN, définissez la valeur BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON ou DERNIER

Peut définir uniquement sur Create (Créer)

StartingPositionTimestamp

N

Obligatoire s'il StartingPosition est défini sur AT_TIMESTAMP

Rubriques

O

Nom de la rubrique

Peut définir uniquement sur Create (Créer)