Utilisation de Lambda avec Amazon MSK - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de Lambda avec Amazon MSK

Note

Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) est un service entièrement géré qui vous permet de créer et d’exécuter des applications qui utilisent Apache Kafka pour traiter des données en streaming. Amazon MSK simplifie la configuration, la mise à l’échelle et la gestion des clusters exécutant Kafka. Amazon MSK facilite également la configuration de votre application pour plusieurs zones de disponibilité et pour des raisons de sécurité avec AWS Identity and Access Management (IAM). Amazon MSK prend en charge plusieurs versions open source de Kafka.

Amazon MSK en tant que source d’événement fonctionne de la même manière qu’Amazon Simple Queue Service (Amazon SQS) ou Amazon Kinesis. Lambda interroge en interne les nouveaux messages de la source d’événement, puis invoque de manière synchrone la fonction Lambda cible. Lambda lit les messages par lot et les fournit à votre fonction en tant que charge utile d’événement. La taille maximale du lot est configurable (la valeur par défaut est de 100 messages). Pour plus d’informations, consultez Comportement de traitement par lots.

Note

Alors que les fonctions Lambda ont généralement un délai d’expiration maximal de 15 minutes, les mappages des sources d’événement pour Amazon MSK, Apache Kafka autogéré, Amazon DocumentDB et Amazon MQ pour ActiveMQ et RabbitMQ ne prennent en charge que les fonctions dont le délai d’expiration maximal est de 14 minutes. Cette contrainte garantit que le mappage des sources d’événements peut gérer correctement les erreurs de fonction et effectuer de nouvelles tentatives.

Lambda lit les messages séquentiellement pour chaque partition. Une seule charge utile Lambda peut contenir des messages provenant de plusieurs partitions. Après avoir traité chaque lot, Lambda valide les décalages des messages dans celui-ci. Si votre fonction renvoie une erreur pour l'un des messages d'un lot, Lambda réessaie le lot de messages complet jusqu'à ce que le traitement réussisse ou que les messages expirent.

Avertissement

Les mappages de sources d'événements Lambda traitent chaque événement au moins une fois, et le traitement des enregistrements peut être dupliqué. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente dans le Knowledge Center. AWS

Pour un exemple de configuration d'Amazon MSK en tant que source d'événements, consultez la section Utilisation d'Amazon MSK en tant que source d'événements AWS Lambda sur le blog AWS Compute. Consultez Intégration Lambda Amazon MSK dans Amazon MSK Labs pour un didacticiel complet.

Exemple d’évènement

Lambda envoie le lot de messages dans le paramètre d’événement quand il invoque votre fonction. La charge utile d’un événement contient un tableau de messages. Chaque élément de tableau contient des détails de la rubrique Amazon MSK et un identifiant de partition, ainsi qu’un horodatage et un message codé en base 64.

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Authentification du cluster MSK

Lambda doit être autorisé à accéder au cluster Amazon MSK, à récupérer des enregistrements et à effectuer d’autres tâches. Amazon MSK prend en charge plusieurs options de contrôle de l’accès client au cluster MSK.

Accès non authentifié

Si aucun client n’accède au cluster via Internet, vous pouvez utiliser un accès non authentifié.

Authentification SASL/SCRAM

Amazon MSK prend en charge l’authentification SASL/SCRAM (Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism) avec chiffrement du protocole TLS (Transport Layer Security). Pour que Lambda puisse se connecter au cluster, vous devez stocker les informations d'authentification (nom d'utilisateur et mot de passe) dans un AWS Secrets Manager secret.

Pour plus d’informations sur l’utilisation de Secrets Manager, consultez Authentification par nom d’utilisateur et mot de passe avec AWS Secrets Manager dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Notez qu’Amazon MSK ne prend pas en charge l’authentification SASL/PLAIN.

Authentification basée sur les rôles IAM

Vous pouvez utiliser IAM pour authentifier l’identité des clients qui se connectent au cluster MSK. Si l’authentification IAM est active sur votre cluster MSK et que vous ne fournissez pas de secret pour l’authentification, Lambda utilise automatiquement l’authentification IAM par défaut. Pour créer et déployer des politiques basées sur les utilisateurs ou les rôles, utilisez la console IAM ou l’API. Pour plus d’informations, consultez Contrôle d’accès IAM dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Pour permettre à Lambda de se connecter au cluster MSK, de lire des enregistrements et d’effectuer d’autres actions requises, ajoutez les autorisations suivantes à votre rôle d’exécution de fonction.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

Vous pouvez étendre ces autorisations à un cluster, une rubrique et un groupe spécifiques. Pour plus d’informations, consultez Actions Amazon MSK Kafka dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Authentification TLS mutuelle

Mutual TLS (mTLS) fournit une authentification bidirectionnelle entre le client et le serveur. Le client envoie un certificat au serveur pour que le serveur vérifie le client, et le serveur envoie un certificat au client pour que le client vérifie le serveur.

Pour Amazon MSK, Lambda agit en tant que client. Vous configurez un certificat client (en tant que secret dans Secrets Manager) pour authentifier Lambda auprès des courtiers de votre cluster MSK. Le certificat client doit être signé par une autorité de certification dans le magasin d’approbations du serveur. Le cluster MSK envoie un certificat de serveur à Lambda pour authentifier les courtiers auprès de Lambda. Le certificat du serveur doit être signé par une autorité de certification (CA) présente dans le AWS trust store.

Pour obtenir des instructions sur la façon de générer un certificat client, consultez Présentation de l’authentification TLS mutuelle pour Amazon MSK en tant que source d’événement.

Amazon MSK ne prend pas en charge les certificats de serveur auto-signés, car tous les courtiers d’Amazon MSK utilisent des certificats publics signés par des autorités de certification Amazon Trust Services, auquel Lambda fait confiance par défaut.

Pour plus d’informations sur le protocole mTLS pour Amazon MSK, consultez Authentification Mutual TLS dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Configuration du secret mTLS

Le secret CLIENT_CERTIFICATE_TLS_AUTH nécessite un champ de certificat et un champ de clé privée. Pour une clé privée chiffrée, le secret nécessite un mot de passe de clé privée. Le certificat et la clé privée doivent être au format PEM.

Note

Lambda prend en charge les algorithmes de chiffrement de clés privées PBES1 (mais pas PBES2).

Le champ de certificat doit contenir une liste de certificats, commençant par le certificat client, suivi de tous les certificats intermédiaires et se terminant par le certificat racine. Chaque certificat doit commencer sur une nouvelle ligne avec la structure suivante :

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager prend en charge les secrets jusqu’à 65 536 octets, ce qui offre suffisamment d’espace pour de longues chaînes de certificats.

La clé privée doit être au format PKCS #8, avec la structure suivante :

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Pour une clé privée chiffrée, utilisez la structure suivante :

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

L’exemple suivant affiche le contenu d’un secret pour l’authentification mTLS à l’aide d’une clé privée chiffrée. Pour une clé privée chiffrée, vous devez inclure le mot de passe de clé privée dans le secret.

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Comment Lambda choisit un courtier bootstrap

Lambda choisit un courtier d’amorçage en fonction des méthodes d’authentification disponibles sur votre cluster, et si vous fournissez un secret pour l’authentification. Si vous fournissez un secret pour les mTLS ou SASL/SCRAM, Lambda choisit automatiquement cette méthode d’authentification. Si vous ne e faites pas, Lambda sélectionne la méthode d’authentification la plus puissante active sur votre cluster. Voici l’ordre de priorité dans lequel Lambda sélectionne un courtier, de l’autorisation la plus forte à la plus faible :

  • MTLs (secret fourni pour les mTLS)

  • SASL/SCRAM (secret fourni pour SASL/SCRAM)

  • SASL IAM (aucun secret fourni et authentification IAM active)

  • TLS non authentifié (aucun secret fourni et authentification IAM non active)

  • Texte brut (aucun secret n’est fourni, et l’authentification IAM et le TLS non authentifié ne sont pas actifs)

Note

Si Lambda ne parvient pas à se connecter au type de courtier le plus sécurisé, Lambda n’essaie pas de se connecter à un type de courtier différent (plus faible). Si vous souhaitez que Lambda choisisse un type de courtier plus faible, désactivez toutes les méthodes d’authentification plus puissantes sur votre cluster.

Gestion des accès et des autorisations d’API

En plus d’accéder au cluster Amazon MSK, votre fonction a besoin d’autorisations pour effectuer diverses actions de l’API Amazon MSK. Ajoutez les autorisations d’écriture au rôle d’exécution de votre fonction. Si vos utilisateurs ont besoin d’accéder à l’une des actions de l’API Amazon MSK, ajoutez les autorisations requises à la politique d’identité de l’utilisateur ou du rôle correspondant.

Vous pouvez ajouter manuellement chacune des autorisations suivantes à votre rôle d’exécution. Vous pouvez également associer la politique AWS gérée AWSLambdaMSKExecutionRoleà votre rôle d'exécution. La politique AWSLambdaMSKExecutionRole contient toutes les actions d’API requises et les autorisations VPC répertoriées ci-dessous.

Autorisations de rôle d’exécution de fonction Lambda requises

Pour créer et stocker des journaux dans un groupe de CloudWatch journaux dans Amazon Logs, votre fonction Lambda doit disposer des autorisations suivantes dans son rôle d'exécution :

Pour que Lambda puisse accéder à votre cluster Amazon MSK en votre nom, votre fonction Lambda doit disposer des autorisations suivantes dans son rôle d’exécution :

Vous n’avez besoin d’ajouter que kafka:DescribeCluster ou kafka:DescribeClusterV2. Pour les clusters MSK alloués, l’une ou l’autre des autorisations fonctionne. Pour les clusters MSK sans serveur, vous devez utiliser kafka:DescribeClusterV2.

Note

Lambda prévoit à terme de supprimer l’autorisation kafka:DescribeCluster de la politique gérée AWSLambdaMSKExecutionRole associée. Si vous utilisez cette politique, vous devez migrer toutes les applications utilisant kafka:DescribeCluster pour qu’elles utilisent kafka:DescribeClusterV2 à la place.

Autorisations VPC

Si seuls des utilisateurs au sein d’un VPC peuvent accéder à votre cluster Amazon MSK, votre fonction Lambda doit être autorisée à accéder à vos ressources Amazon VPC. Ces ressources incluent les sous-réseaux, groupes de sécurité et interfaces réseau de votre VPC. Pour accéder à ces ressources, le rôle d'exécution de votre fonction doit disposer des autorisations suivantes. Ces autorisations sont incluses dans la politique AWSLambdaMSKExecutionRole AWS gérée.

Autorisations de fonction Lambda facultatives

Votre fonction Lambda peut également nécessiter ces autorisations pour :

  • Accédez à votre secret SCRAM, si vous utilisez l’authentification SASL/SCRAM.

  • Décrivez votre secret Secrets Manager.

  • Accédez à votre AWS Key Management Service (AWS KMS) clé gérée par le client.

  • Envoyez les enregistrements des invocations ayant échoué vers une destination.

Secrets Manager et AWS KMS autorisations

Selon le type de contrôle d'accès que vous configurez pour vos courtiers Amazon MSK, votre fonction Lambda peut avoir besoin d'une autorisation pour accéder à votre secret SCRAM (si vous utilisez l'authentification SASL/SCRAM) ou au secret Secrets Manager pour déchiffrer votre clé gérée par le client. AWS KMS Afin d’accéder à ces ressources, le rôle d’exécution de votre fonction doit disposer des autorisations suivantes :

Envoi d’enregistrements vers une destination

Si vous souhaitez envoyer des enregistrements d'invocation ayant échoué vers une destination en cas d’échec, votre fonction Lambda doit être autorisée à envoyer ces enregistrements. Pour les mappages de sources d'événements Kafka, vous pouvez choisir entre une rubrique Amazon SNS, une file d'attente Amazon SQS ou un compartiment Amazon S3 comme destination. Pour envoyer des enregistrements à une rubrique SNS, le rôle d’exécution de votre fonction doit avoir les autorisations suivantes :

Pour envoyer des enregistrements à une rubrique SQS, le rôle d’exécution de votre fonction doit avoir les autorisations suivantes :

Pour envoyer des enregistrements à un compartiment S3, le rôle d’exécution de votre fonction doit avoir les autorisations suivantes :

En outre, si vous avez configuré une clé KMS sur votre destination, Lambda a besoin des autorisations suivantes en fonction du type de destination :

Ajout d’autorisations à votre rôle d’exécution

Suivez ces étapes pour ajouter la politique AWS gérée AWSLambdaMSKExecutionRoleà votre rôle d'exécution à l'aide de la console IAM.

Pour ajouter une politique AWS gérée
  1. Ouvrez la page Policies (Stratégies) de la console IAM.

  2. Dans la zone de recherche, saisissez le nom de la stratégie (AWSLambdaMSKExecutionRole).

  3. Sélectionnez la stratégie dans la liste, puis choisissez Policy actions (Actions de stratégie), Attach (Attacher).

  4. Sur la page Attach policy (Attacher une politique), sélectionnez votre rôle d’exécution dans la liste, puis choisissez Attach policy (Attacher une politique).

Octroi d’accès à des utilisateurs avec une politique IAM

Par défaut, les utilisateurs et les rôles ne sont pas autorisés à effectuer des opérations d’API Amazon MSK. Pour accorder l’accès aux utilisateurs de votre organisation ou de votre compte, vous pouvez ajouter ou mettre à jour une politique basée sur l’identité. Pour plus d’informations, consultez Exemples de politiques basées sur une identité Amazon MSK dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Authentification et erreurs d’autorisation

Si l'une des autorisations requises pour utiliser les données du cluster Amazon MSK est manquante, Lambda affiche l'un des messages d'erreur suivants dans le mappage des sources d'événements ci-dessous. LastProcessingResult

Le cluster n’a pas pu autoriser Lambda

Pour SASL/SCRAM ou mTLS, cette erreur indique que l’utilisateur fourni ne dispose pas de toutes les autorisations de liste de contrôle d’accès (ACL) Kafka requises suivantes :

  • DescribeConfigs Cluster

  • Décrire un groupe

  • Lire le groupe

  • Décrire la rubrique

  • Lire la rubrique

Pour le contrôle d’accès IAM, le rôle d’exécution de fonction ne dispose pas d’une ou de plusieurs autorisations requises pour accéder au groupe ou à la rubrique. Passez en revue la liste des autorisations requises dans Authentification basée sur les rôles IAM.

Lorsque vous créez des listes ACL Kafka ou une stratégie IAM avec les autorisations requises de cluster Kafka, vous devez spécifier la rubrique et le groupe en tant que ressources. Le nom de la rubrique doit correspondre à la rubrique dans le mappage des sources d’événements. Le nom du groupe doit correspondre à l’UUID du mappage des sources d’événements.

Une fois que vous avez ajouté les autorisations requises au rôle d’exécution, il peut y avoir un délai de plusieurs minutes avant que les modifications ne prennent effet.

Échec de l’authentification SASL

Pour SASL/SCRAM, cet échec indique que le nom d’utilisateur et le mot de passe fournis ne sont pas valides.

Pour le contrôle d’accès IAM, le rôle d’exécution est manquant kafka-cluster:Connect autorisations pour le cluster. Ajoutez cette autorisation au rôle et spécifiez l’Amazon Resource Name (ARN) du cluster en tant que ressource.

Cette erreur peut se produire de façon intermittente. Le cluster rejette les connexions une fois que le nombre de connexions TCP dépasse le Quota de service Amazon MSK. Lambda fait marche arrière et tente de nouveau jusqu’à ce qu’une connexion soit réussie. Une fois que Lambda se connecte au cluster et interroge les enregistrements, le dernier résultat de traitement passe à OK.

Le serveur n’a pas réussi à authentifier Lambda

Cette erreur indique que les courtiers Amazon MSK Kafka n’ont pas réussi à s’authentifier auprès de Lambda. Cette erreur se produit dans les conditions suivantes :

  • Vous n’avez pas fourni de certificat client pour l’authentification mTLS.

  • Vous avez fourni un certificat client, mais les courtiers ne sont pas configurés pour utiliser les mTLS.

  • Les courtiers ne font pas confiance à un certificat client.

Le certificat ou la clé privée fournis n’est pas valide

Cette erreur indique que le consommateur Amazon MSK n’a pas pu utiliser le certificat ou la clé privée fournis. Assurez-vous que le certificat et la clé utilisent le format PEM et que le chiffrement de la clé privée utilise un algorithme PBES1.

Configuration réseau

Pour que Lambda puisse utiliser votre cluster Kafka comme source d'événements, il doit avoir accès au VPC Amazon dans lequel réside votre cluster. Nous vous recommandons de déployer des points de terminaison AWS PrivateLink VPC pour que Lambda puisse accéder à votre VPC. Déployez des points de terminaison pour Lambda AWS Security Token Service et AWS STS(). Si l’agent utilise l’authentification, déployez également un point de terminaison de VPC pour Secrets Manager. Si vous avez configuré une destination en cas d’échec, déployez également un point de terminaison de VPC pour le service de destination.

Vous pouvez aussi vous assurer que le VPC associé à votre cluster Kafka inclut une passerelle NAT par sous-réseau public. Pour plus d’informations, consultez Activer l'accès à Internet pour les fonctions Lambda connectées au VPC.

Si vous utilisez des points de terminaison VPC, vous devez également les configurer pour activer les noms DNS privés.

Lorsque vous créez un mappage de source d'événements pour un cluster MSK, Lambda vérifie si des interfaces réseau élastiques (ENI) sont déjà présentes pour les sous-réseaux et les groupes de sécurité du VPC de votre cluster. Si Lambda trouve des ENI existants, il tente de les réutiliser. Sinon, Lambda crée de nouveaux ENI pour se connecter à la source de l'événement et appeler votre fonction.

Note

Les fonctions Lambda s'exécutent toujours dans des VPC appartenant au service Lambda. Ces VPC sont gérés automatiquement par le service et ne sont pas visibles pour les clients. Vous pouvez également connecter votre fonction à un Amazon VPC. Dans les deux cas, la configuration VPC de votre fonction n'affecte pas le mappage des sources d'événements. Seule la configuration du VPC de la source d'événements détermine la manière dont Lambda se connecte à votre source d'événements.

Votre configuration Amazon VPC est détectable via l’API Amazon MSK. Vous n’avez pas besoin de le configurer pendant la configuration à l’aide de la commande create-event-source-mapping.

Pour plus d'informations sur la configuration du réseau, consultez Configuration AWS Lambda avec un cluster Apache Kafka au sein d'un VPC sur AWS le blog Compute.

Règles du groupe de sécurité VPC

Configurez les groupes de sécurité pour l'Amazon VPC contenant votre cluster avec les règles suivantes (au minimum) :

  • Règles entrantes : autorise tout le trafic sur le port de l’agent Amazon MSK (9092 pour le texte brut, 9094 pour TLS, 9096 pour SASL, 9098 pour IAM) pour les groupes de sécurité spécifiés pour votre source d’événements.

  • Règles sortantes : autorisent tout le trafic sur le port 443 pour toutes les destinations. Autorisez tout le trafic sur le port de l’agent Amazon MSK (9092 pour le texte brut, 9094 pour TLS, 9096 pour SASL, 9098 pour IAM) pour les groupes de sécurité spécifiés pour votre source d’événements.

  • Si vous utilisez des points de terminaison d’un VPC au lieu de la passerelle NAT, les groupes de sécurité associés aux points de terminaison du VPC doivent autoriser tout le trafic entrant sur le port 443 à partir des groupes de sécurité des sources d’événement.

Utilisation des points de terminaison d'un VPC

Lorsque vous utilisez des points de terminaison VPC, les appels d'API pour appeler votre fonction sont acheminés via ces points de terminaison à l'aide des ENI. Le principal du service Lambda doit appeler sts:AssumeRole et appeler tous lambda:InvokeFunction les rôles et fonctions utilisant ces ENI.

Par défaut, les points de terminaison VPC ont des politiques IAM ouvertes. La meilleure pratique consiste à restreindre ces politiques afin de n'autoriser que des principaux spécifiques à effectuer les actions nécessaires à l'aide de ce point de terminaison. Pour garantir que le mappage de votre source d'événements peut appeler votre fonction Lambda, la politique de point de terminaison VPC doit autoriser le principe du service Lambda à appeler et. sts:AssumeRole lambda:InvokeFunction Le fait de restreindre vos politiques de point de terminaison VPC pour autoriser uniquement les appels d'API provenant de votre organisation empêche le mappage des sources d'événements de fonctionner correctement.

Les exemples de politiques de point de terminaison VPC suivants montrent comment accorder l'accès requis au principal de service Lambda pour les points de terminaison Lambda et AWS STS Lambda.

Exemple Politique de point de terminaison VPC - point de terminaison AWS STS
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
Exemple Politique de point de terminaison VPC - Point de terminaison Lambda
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Si votre courtier Kafka utilise l'authentification, vous pouvez également restreindre la politique de point de terminaison VPC pour le point de terminaison Secrets Manager. Pour appeler l'API Secrets Manager, Lambda utilise votre rôle de fonction, et non le principal du service Lambda. L'exemple suivant montre une politique de point de terminaison de Secrets Manager.

Exemple Politique relative aux points de terminaison VPC - Point de terminaison Secrets Manager
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

Si vous avez configuré une destination en cas de défaillance, Lambda utilise également le rôle de votre fonction pour appeler s3:PutObject sqs:sendMessage ou utiliser les ENI sns:Publish gérés par Lambda.

Ajout d’Amazon MSK en tant que source d’événement

Pour créer un mappage des sources d’événements, ajoutez Amazon MSK en tant que déclencheur de fonction Lambda à l’aide de la console Lambda, d’un kit SDK AWS, ou de l’AWS Command Line Interface (AWS CLI). Notez que lorsque vous ajoutez Amazon MSK en tant que déclencheur, Lambda prend en compte les paramètres VPC du cluster Amazon MSK, et non les paramètres VPC de la fonction Lambda.

Cette section explique comment créer un mappage de source d'événement à l'aide de la console Lambda et de l' AWS CLI.

Prérequis

  • Un cluster Amazon MSK et une rubrique Kafka. Pour plus d’informations, consultez Mise en route avec Amazon MSK dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

  • Rôle d'exécution autorisé à accéder aux AWS ressources utilisées par votre cluster MSK.

Identifiant de groupe de consommateurs personnalisable

Lorsque vous configurez Kafka comme source d’événements, vous pouvez spécifier un identifiant de groupe de consommateurs. Cet identifiant de groupe de consommateurs est un identifiant existant pour le groupe de clients Kafka auquel vous souhaitez rattacher votre fonction Lambda. Vous pouvez utiliser cette fonction pour migrer facilement toutes les configurations de traitement d’enregistrements Kafka en cours depuis d’autres clients vers Lambda.

Si vous spécifiez un identifiant de groupe de consommateurs et qu’il existe d’autres sondeurs actifs au sein de ce groupe de consommateurs, Kafka distribue des messages à tous les consommateurs. En d’autres termes, Lambda ne reçoit pas l’intégralité du message relatif au sujet Kafka. Si vous souhaitez que Lambda gère tous les messages de la rubrique, désactivez tous les autres sondeurs de ce groupe de consommateurs.

De plus, si vous spécifiez un identifiant de groupe de consommateurs et que Kafka trouve un groupe de consommateurs existant valide avec le même identifiant, Lambda ignore le paramètre StartingPosition pour le mappage des sources d’événements. Lambda commence plutôt à traiter les enregistrements en fonction de la compensation engagée par le groupe de consommateurs. Si vous spécifiez un identifiant de groupe de consommateurs et que Kafka ne trouve aucun groupe de consommateurs existant, Lambda configure votre source d’événement avec le StartingPosition spécifié.

L’identifiant du groupe de consommateurs que vous spécifiez doit être unique parmi toutes vos sources d’événements Kafka. Après avoir créé un mappage des sources d’événements Kafka avec l’identifiant de groupe de consommateurs spécifié, vous ne pouvez plus mettre à jour cette valeur.

Destinations en cas d’échec

Pour retenir les enregistrements des invocations échouées ou des charges utiles surdimensionnées provenant de votre source d’événements Kafka, configurez une destination en cas d’échec pour votre fonction. Quand une invocation échoue, Lambda envoie à votre destination un enregistrement JSON avec des détails sur l’invocation.

Vous pouvez choisir entre une rubrique Amazon SNS, une file d’attente Amazon SQS ou un compartiment Amazon S3 comme destination. Pour les destinations de rubriques SNS ou de files d’attente SQS, Lambda envoie les métadonnées d’enregistrement à la destination. Pour les destinations de compartiment S3, Lambda envoie l’intégralité de l’enregistrement d’invocation ainsi que les métadonnées à la destination.

Pour que Lambda envoie correctement les enregistrements vers la destination que vous avez choisie, assurez-vous que le rôle d’exécution de votre fonction contient les autorisations appropriées. Le tableau décrit également comment chaque type de destination reçoit l’enregistrement d’invocation JSON.

Type de destination Pris en charge pour les sources d’événements suivantes Autorisations nécessaires Format JSON spécifique à la destination

File d’attente Amazon SQS

  • Kinesis

  • DynamoDB

  • Apache Kafka autogéré et Apache Kafka géré

Lambda transmet les métadonnées d’enregistrement d’invocation en tant que Message à la destination.

Rubrique Amazon SNS

  • Kinesis

  • DynamoDB

  • Apache Kafka autogéré et Apache Kafka géré

Lambda transmet les métadonnées d’enregistrement d’invocation en tant que Message à la destination.

Compartiment Amazon S3

  • Apache Kafka autogéré et Apache Kafka géré

Lambda stocke l’enregistrement d’invocation ainsi que ses métadonnées à la destination.

Astuce

La bonne pratique consiste à n’inclure les autorisations minimales requises que dans votre rôle d’exécution.

Destinations SNS et SQS

L’exemple suivant montre ce que Lambda envoie à une rubrique SNS ou à une destination de file d’attente SQS pour une invocation de source d’événement Kafka qui a échoué. Chacune des clés sous recordsInfo contient à la fois le sujet et la partition Kafka, séparés par un trait d’union. Par exemple, pour la clé"Topic-0", Topic est la rubrique Kafka, et 0 est la partition. Pour chaque sujet et chaque partition, vous pouvez utiliser les décalages et les données d’horodatage pour trouver les enregistrements d’invocation d’origine.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Destinations S3

Pour les destinations S3, Lambda envoie l’intégralité de l’enregistrement d’invocation ainsi que les métadonnées à la destination. L’exemple suivant montre que Lambda envoie vers une destination de compartiment S3 en cas d’échec d’une invocation de source d’événement Kafka. Outre tous les champs de l’exemple précédent pour les destinations SQS et SNS, le champ payload contient l’enregistrement d’invocation d’origine sous forme de chaîne JSON échappée.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Astuce

Nous vous recommandons d’activer la gestion des versions S3 sur votre compartiment de destination.

Configuration des destinations en cas de panne

Pour configurer une destination en cas de panne à l’aide de la console, procédez comme suit :

  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Invocation du mappage des sources d’événements.

  5. Pour le mappage des sources d’événements, choisissez une source d’événements configurée pour cette fonction.

  6. Pour Condition, sélectionnez En cas d’échec. Pour les invocations de mappage des sources d’événements, il s’agit de la seule condition acceptée.

  7. Pour Type de destination, choisissez le type de destination auquel Lambda envoie les enregistrements d’invocation.

  8. Pour Destination, choisissez une ressource.

  9. Choisissez Enregistrer.

Vous pouvez également configurer une destination en cas de panne à l’aide de l’API Lambda. Par exemple, la commande CreateEventSourceMappingCLI suivante ajoute une destination SQL en cas de défaillance à : MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

La commande UpdateEventSourceMappingCLI suivante ajoute une destination S3 en cas de défaillance à la source d'événements Kafka associée à l'entrée : uuid

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Pour supprimer une destination, entrez une chaîne vide comme argument du paramètre destination-config :

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Ajout d’un déclencheur Amazon MSK (console)

Suivez ces étapes afin d’ajouter votre cluster Amazon MSK et une rubrique Kafka en tant que déclencheur pour votre fonction Lambda.

Pour ajouter un déclencheur Amazon MSK à votre fonction Lambda (console)
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom de votre fonction Lambda.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).

  4. Sous Trigger configuration (Configuration du déclencheur), procédez comme suit :

    1. Choisissez le type de déclencheur MSK.

    2. Pour MSK cluster (Cluster MSK), sélectionnez votre cluster.

    3. Pour Batch size (Taille de lot), entrez le nombre maximum d’enregistrements à recevoir dans un même lot.

    4. Pour Batch window, veuillez saisir l’intervalle de temps maximal en secondes nécessaire à Lambda pour collecter des enregistrements avant d’invoquer la fonction.

    5. Dans Topic name (Nom de la rubrique), saisissez le nom d’une rubrique Kafka.

    6. (Facultatif) Pour l’identifiant de groupe de consommateurs, entrez l’identifiant d’un groupe de consommateurs Kafka à rejoindre.

    7. (Facultatif) Pour Position de départ, choisissez Dernier pour commencer à lire le flux à partir du dernier enregistrement, Supprimer l’horizon pour commencer au premier enregistrement disponible ou À l’horodatage pour spécifier un horodatage à partir duquel commencer la lecture.

    8. (Facultatif) Pour Authentication (Authentification), choisissez la clé secrète pour vous authentifier auprès des courtiers de votre cluster MSK.

    9. Pour créer le déclencheur dans un état désactivé pour le test (recommandé), désactivez Enable trigger (Activer le déclencheur). Ou, pour activer le déclencheur immédiatement, sélectionnezActiver un déclencheur.

  5. Pour créer le déclencheur, choisissez Add (Ajouter).

Ajout d’un déclencheur Amazon MSK (AWS CLI)

Utilisez les exemples de AWS CLI commandes suivants pour créer et afficher un déclencheur Amazon MSK pour votre fonction Lambda.

Création d'un déclencheur à l'aide du AWS CLI

Exemple — Crée un mappage des sources d’événements pour le cluster qui utilise l’authentification IAM

L'exemple suivant utilise la create-event-source-mapping AWS CLI commande pour mapper une fonction Lambda nommée my-kafka-function à une rubrique Kafka nommée. AWSKafkaTopic La position de départ de la rubrique est définie sur LATEST. Lorsque le cluster utilise l'authentification basée sur les rôles IAM, vous n'avez pas besoin d'objet. SourceAccessConfiguration Exemple :

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Exemple — Crée un mappage des sources d’événements pour le cluster qui utilise l’authentification SASL/SCRAM

Si le cluster utilise l'authentification SASL/SCRAM, vous devez inclure un SourceAccessConfigurationobjet qui spécifie SASL_SCRAM_512_AUTH et un ARN secret du Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Exemple — Crée un mappage des sources d’événements pour le cluster qui utilise l’authentification mTLS

Si le cluster utilise l'authentification mTLS, vous devez inclure un SourceAccessConfigurationobjet qui spécifie CLIENT_CERTIFICATE_TLS_AUTH et un ARN secret de Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

Pour plus d'informations, consultez la documentation de référence de CreateEventSourceMappingl'API.

Affichage de l'état à l'aide du AWS CLI

L'exemple suivant utilise la get-event-source-mapping AWS CLI commande pour décrire l'état du mappage des sources d'événements que vous avez créé.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Création de mappages de sources d’événements entre comptes

Vous pouvez utiliser la connectivité privée multi-VPC pour connecter une fonction Lambda à un cluster MSK provisionné dans un autre Compte AWS. La connectivité multi-VPC utilise AWS PrivateLink, ce qui permet de maintenir tout le trafic sur le AWS réseau.

Note

Vous ne pouvez pas créer de mappages de sources d’événements entre comptes pour les clusters MSK sans serveur.

Pour créer un mappage des sources d’événements entre comptes, vous devez d’abord configurer la connectivité multi-VPC pour le cluster MSK. Lorsque vous créez le mappage des sources d’événements, utilisez l’ARN de connexion VPC géré au lieu de l’ARN du cluster, comme indiqué dans les exemples suivants. Le CreateEventSourceMappingfonctionnement varie également en fonction du type d'authentification utilisé par le cluster MSK.

Exemple — Crée un mappage des sources d’événements entre comptes pour le cluster qui utilise l’authentification IAM

Lorsque le cluster utilise l'authentification basée sur les rôles IAM, vous n'avez pas besoin d'objet. SourceAccessConfiguration Exemple :

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Exemple — Crée un mappage des sources d’événements entre comptes pour le cluster qui utilise l’authentification SASL/SCRAM

Si le cluster utilise l'authentification SASL/SCRAM, vous devez inclure un SourceAccessConfigurationobjet qui spécifie SASL_SCRAM_512_AUTH et un ARN secret du Secrets Manager.

Il existe deux manières d’utiliser les secrets pour les mappages de sources d’événements Amazon MSK entre comptes avec authentification SASL/SCRAM :

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Exemple — Crée un mappage des sources d’événements entre comptes pour le cluster qui utilise l’authentification mTLS

Si le cluster utilise l'authentification mTLS, vous devez inclure un SourceAccessConfigurationobjet qui spécifie CLIENT_CERTIFICATE_TLS_AUTH et un ARN secret de Secrets Manager. Le secret peut être stocké dans le compte du cluster ou dans le compte de la fonction Lambda.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Scalabilité automatique de la source d’événements Amazon MSK

Lorsque vous créez initialement une source d’événement Amazon MSK, Lambda alloue un consommateur pour traiter toutes les partitions de la rubrique Kafka. Chaque consommateur dispose de plusieurs processeurs exécutés en parallèle pour gérer des charges de travail accrues. Lambda augmente ou diminue automatiquement le nombre de consommateurs, en fonction de la charge de travail. Pour préserver l’ordre des messages dans chaque partition, le nombre maximum de consommateurs est de un par partition dans la rubrique.

Toutes les minutes, Lambda évalue le décalage de consommateurs de toutes les partitions dans la rubrique. Si le décalage est trop élevé, la partition reçoit des messages plus rapidement que Lambda ne peut les traiter. Si nécessaire, Lambda ajoute ou supprime des consommateurs dans la rubrique. Le processus de mise à l’échelle consistant à ajouter ou à supprimer des consommateurs a lieu dans les trois minutes suivant l’évaluation.

Si votre fonction Lambda cible est limitée, Lambda réduit le nombre de consommateurs. Cette action réduit la charge de travail de la fonction en diminuant le nombre de messages que les consommateurs peuvent échanger avec la fonction.

Pour contrôler le débit de votre rubrique Kafka, consultez la Métrique du décalage de délai Lambda émet pendant que votre fonction traite les enregistrements.

Pour vérifier le nombre d’invocations de fonctions qui se produisent en parallèle, vous pouvez également surveiller les métriques de simultanéité pour votre fonction.

Positions de départ des interrogations et des flux

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON ou AT_TIMESTAMP.

CloudWatch Métriques Amazon

Lambda émet la métrique OffsetLag pendant que votre fonction traite les registres. La valeur de cette métrique est la différence de décalage entre le dernier enregistrement inscrit dans la rubrique source de l’événement Kafka et le dernier enregistrement traité par le groupe de consommateurs de votre fonction. Vous pouvez utiliser OffsetLag pour estimer la latence entre le moment où un enregistrement est ajouté et celui où votre groupe de consommateurs le traite.

Une tendance à la hausse de OffsetLag peut indiquer des problèmes liés aux sondeurs dans le groupe de consommateurs de votre fonction. Pour plus d’informations, consultez Utilisation des métriques de fonction Lambda.

Paramètres de configuration d’Amazon MSK

Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMappinget les mêmes opérations d'UpdateEventSourceMappingAPI. Cependant, seuls certains paramètres s’appliquent à Amazon MSK.

Paramètres de source d’événement qui s’appliquent à Amazon MSK
Paramètre Obligatoire Par défaut Remarques

AmazonManagedKafkaEventSourceConfig

N

Contient le ConsumerGroupId champ, dont la valeur par défaut est unique.

Peut définir uniquement sur Create (Créer)

BatchSize

N

100

Maximum : 10 000.

Activé

N

Activées

EventSourceArn

Y

Peut définir uniquement sur Create (Créer)

FunctionName

Y

FilterCriteria

N

Filtrage des événements Lambda

MaximumBatchingWindowInSeconds

N

500 ms

Comportement de traitement par lots

SourceAccessConfigurations

N

Pas d’informations d’identification

Informations d’identification d’authentification SASL/SCRAM ou CLIENT_CERTIFICATE_TLS_AUTH (TLS mutuel) pour votre source d’événement

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON ou DERNIER

Peut définir uniquement sur Create (Créer)

StartingPositionTimestamp

N

Obligatoire s'il StartingPosition est défini sur AT_TIMESTAMP

Rubriques

O

Nom de rubrique Kafka

Peut définir uniquement sur Create (Créer)