Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation AWS Lambda avec Amazon Kinesis
Note
Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.
Vous pouvez utiliser une AWS Lambda fonction pour traiter les enregistrements d'un flux de données Amazon Kinesis.
Un flux de données Kinesis est un ensemble de partitions. Chaque partition contient une séquence d’enregistrements de données. Un consommateur est une application qui traite les données d’un flux de données Kinesis. Vous pouvez mapper une fonction Lambda à un consommateur à débit partagé (itérateur standard) ou à un consommateur à débit dédié avec diffusion améliorée.
Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis pour des enregistrements qui utilisent le protocole HTTP. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.
Pour réduire la latence et optimiser le débit en lecture, vous pouvez créer un consommateur de flux de données avec diffusion améliorée. Les consommateurs de flux obtiennent une connexion dédiée pour chaque partition qui n’a pas d’impact sur les autres applications lisant sur le flux. Le débit dédié peut aider si vous avez de nombreuses applications lisant les mêmes données, ou si vous retraitez un flux avec de gros enregistrements. Kinesis envoie des enregistrements à Lambda via HTTP/2.
Pour plus d’informations sur les flux de données Kinesis, consultez Lecture de données à partir d’Amazon Kinesis Data Streams.
Sections
- Exemple d’évènement
- Flux d’interrogation et de mise en lots
- Position de départ du sondage et du stream
- Configuration de votre fonction et de votre flux de données
- Autorisations du rôle d’exécution
- Ajouter des autorisations et créer le mappage des sources d'événements
- Filtrage d’événements Kinesis
- API de mappage de la source d’événement
- Gestion des erreurs
- CloudWatch Métriques Amazon
- Fenêtres horaires
- Signalement des échecs d’articles par lots
- Paramètres de configuration d’Amazon Kinesis
- Tutoriel : Utilisation AWS Lambda avec Amazon Kinesis
- Modèle AWS SAM pour une application Kinesis
Exemple d’évènement
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }
Flux d’interrogation et de mise en lots
Lambda lit les enregistrements du flux de données et invoque votre fonction de manière synchrone avec un événement contenant des enregistrements de flux. Lambda lit les registres par lots et invoque votre fonction pour les traiter. Chaque lot contient des enregistrements provenant d’un seul flux de données/partition.
Par défaut, Lambda invoque votre fonction dès que des enregistrements sont disponibles. Si le lot que Lambda lit à partir de la source d’événements ne comprend qu’un seul enregistrement, Lambda envoie un seul registre à la fonction. Pour éviter d’invoquer la fonction avec un petit nombre de registres, vous pouvez indiquer à la source d’événement de les mettre en mémoire tampon pendant 5 minutes en configurant une fenêtre de traitement par lots. Avant d’invoquer la fonction, Lambda continue de lire les registres de la source d’événements jusqu’à ce qu’il ait rassemblé un lot complet, que la fenêtre de traitement par lot expire ou que le lot atteigne la limite de charge utile de 6 Mo. Pour plus d’informations, consultez Comportement de traitement par lots.
Avertissement
Les mappages de sources d'événements Lambda traitent chaque événement au moins une fois, et le traitement des enregistrements peut être dupliqué. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente
Si votre fonction renvoie une erreur, Lambda réessaie de traiter le lot jusqu’à ce que le traitement réussisse ou que les données expirent. Pour éviter le blocage de partitions, vous pouvez configurer le mappage de source d’événement pour effectuer de nouvelles tentatives avec des lots de plus petite taille, limiter le nombre de tentatives ou ignorer les enregistrements trop anciens. Pour conserver les événements rejetés, vous pouvez configurer le mappage des sources d’événements afin d’envoyer les détails des lots ayant échoué à une file d’attente SQS standard ou à une rubrique SNS standard.
Pour augmenter la simultanéité, vous pouvez également traiter plusieurs lots en parallèle à partir de chaque partition. Lambda peut traiter simultanément jusqu’à 10 lots dans chaque partition. Si vous augmentez le nombre de lots simultanés par partition, Lambda assure toujours un traitement dans l’ordre au niveau clé de partition.
Configurez le ParallelizationFactorparamètre pour traiter une partition d'un flux de données Kinesis ou DynamoDB avec plusieurs invocations Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge à partir d’une partition via un facteur de parallélisation de 1 (par défaut) à 10. Par exemple, quand vous définissez ParallelizationFactor
sur 2, vous pouvez avoir jusqu'à 200 invocations Lambda simultanés pour traiter 100 partitions de données Kinesis (bien que, dans la réalité, la métrique ConcurrentExecutions
puisse indiquer une valeur différente). Cela permet d’augmenter le débit de traitement quand le volume de données est volatil et que la valeur du paramètre IteratorAge
est élevée.
Vous pouvez également utiliser l'ParallelizationFactor
agrégation avec Kinesis. Le comportement du mappage des sources d'événements varie selon que vous utilisez ou non un ventilateur amélioré :
-
Sans ventilation améliorée : tous les événements d'un événement agrégé doivent avoir la même clé de partition. La clé de partition doit également correspondre à celle de l'événement agrégé. Si les événements contenus dans l'événement agrégé ont des clés de partition différentes, Lambda ne peut garantir le traitement ordonné des événements par clé de partition.
-
Avec un ventilateur amélioré : Lambda décode d'abord l'événement agrégé en événements individuels. L'événement agrégé peut avoir une clé de partition différente de celle des événements qu'il contient. Cependant, les événements qui ne correspondent pas à la clé de partition sont supprimés et perdus
. Lambda ne traite pas ces événements et ne les envoie pas vers une destination de défaillance configurée.
Position de départ du sondage et du stream
Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.
-
Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.
-
Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.
Ce comportement signifie que si vous spécifiez LATEST
comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON
ou AT_TIMESTAMP
.
Configuration de votre fonction et de votre flux de données
Votre fonction Lambda est une application consommateur pour votre flux de données. Elle traite un lot d’enregistrements à la fois à partir de chaque partition. Vous pouvez mapper une fonction Lambda à un flux de données (itérateur standard) ou au consommateur d’un flux (diffusion améliorée).
Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis afin d’obtenir des enregistrements à une fréquence de base d’une fois par seconde. Lorsque d’autres enregistrements sont disponibles, Lambda continue de traiter les lots jusqu’à ce que la fonction rattrape le flux. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.
Pour réduire la latence et d’optimiser le débit en lecture, créez un consommateur de flux de données avec diffusion améliorée. Les consommateurs avec diffusion améliorée obtiennent une connexion dédiée pour chaque partition qui n’a pas d’impact sur les autres applications lisant sur le flux. Les consommateurs de flux utilisent HTTP/2 afin de réduire la latence en transférant les enregistrements à Lambda via une connexion longue durée et en comprimant les en-têtes de requête. Vous pouvez créer un consommateur de flux à l'aide de l'API Kinesis RegisterStreamConsumer.
aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
Vous devriez voir la sortie suivante :
{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }
Pour augmenter la vitesse à laquelle votre fonction traite les enregistrements, ajoutez des partitions à votre flux de données. Lambda traite les enregistrements de chaque partition dans l’ordre. Il arrête de traiter les enregistrements supplémentaires d’une partition si votre fonction renvoie une erreur. Plus de partitions signifient plus de lots traités en une seule fois, ce qui réduit l’impact des erreurs sur la simultanéité.
Si votre fonction ne peut pas augmenter sa capacité pour traiter le nombre total de lots simultanés, demandez une augmentation de quota ou réservez de la simultanéité pour votre fonction.
Autorisations du rôle d’exécution
Lambda a besoin des autorisations suivantes pour gérer les ressources liées à votre flux de données Kinesis. La politique AWSLambdaKinesisExecutionRolegérée inclut ces autorisations. Vous pouvez ajouter cette politique gérée au rôle d'exécution de votre fonction.
Si votre flux de données Kinesis et votre fonction Lambda se trouvent dans des comptes différents, assurez-vous que votre ressource de flux accorde des autorisations kinesis:DescribeStream
au rôle ou au compte d'exécution de votre fonction Lambda.
En outre, lorsque vous créez le mappage de vos sources d'événements depuis la console, vous devez disposer des autorisations kinesis : ListStreams et kinesis : ListStreamConsumers.
Pour envoyer des enregistrements de lots ayant échoué à une file d’attente SQS standard ou à une rubrique SNS standard, votre fonction a besoin d’autorisations supplémentaires. Chaque service de destination nécessite une autorisation différente, comme suit :
Amazon SQS — en tant que : SendMessage
Amazon SNS – sns:Publish
Ajouter des autorisations et créer le mappage des sources d'événements
Créez un mappage de source d’événement pour indiquer à Lambda d’envoyer des enregistrements de votre flux de données à une fonction Lambda. Vous pouvez créer plusieurs mappages de source d’événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou pour traiter des éléments en provenance de plusieurs flux de données avec une seule fonction. Lorsque vous traitez des éléments à partir de plusieurs flux de données, chaque lot ne contiendra que des enregistrements provenant d’un seul flux/partition.
Pour configurer votre fonction afin qu'elle lise à partir d'un flux de données Kinesis, ajoutez la politique AWSLambdaKinesisExecutionRole AWS gérée à votre rôle d'exécution et créez un déclencheur Kinesis.
Pour ajouter des autorisations et créer un déclencheur
Ouvrez la page Functions
(Fonctions) de la console Lambda. -
Choisissez le nom d’une fonction.
-
Choisissez l'onglet Configuration, puis Permissions (Autorisations).
-
Sous Nom du rôle, choisissez le lien vers votre rôle d'exécution. Ce lien ouvre le rôle dans la console IAM.
-
Choisissez Ajouter des autorisations, puis Attacher des politiques.
-
Dans le champ de recherche, entrez
AWSLambdaKinesisExecutionRole
. Ajoutez cette politique à votre rôle d'exécution. Il s'agit d'une politique AWS gérée qui contient les autorisations dont votre fonction a besoin pour lire un flux Kinesis. Pour plus d'informations sur cette politique, consultez AWSLambdaKinesisExecutionRolela référence des politiques AWS gérées. -
Retournez à votre fonction dans la console Lambda. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).
-
Choisissez un type de déclencheur.
-
Configurez les options requises, puis choisissez Add (Ajouter).
Lambda prend en charge les options suivantes pour les sources d'événements Kinesis :
Options de source d’événement
-
Kinesis stream (Flux Kinesis) – Flux Kinesis à partir duquel lire les enregistrements.
-
Consumer (Consommateur) (facultatif) – Utilisez un flux consommateur pour lire le flux sur une connexion dédiée.
-
Batch size (Taille de lot) – Nombre d’enregistrements par lot à envoyer à la fonction, jusqu’à 10 000. Lambda transmet tous les enregistrements du lot à la fonction en une seule invocation, tant que la taille totale des événements ne dépasse pas la limite de charge utile pour une invocation synchrone (6 Mo).
Batch window (Fenêtre de traitement par lots) – Intervalle de temps maximum (en secondes) pour collecter des enregistrements avant d’invoquer la fonction.
-
Starting position (Position de départ) – Traiter uniquement les nouveaux enregistrements, tous les enregistrements existants ou les enregistrements créés après une certaine date.
-
Latest (Derniers) – Traiter les nouveaux enregistrements ajoutés au flux.
-
Trim horizon (Supprimer l’horizon) – Traiter tous les enregistrements figurant dans le flux.
-
At timestamp (À l’horodatage) – Traitez les enregistrements à partir d’une heure spécifique.
Après le traitement de tous les enregistrements existants, la fonction est à jour et continue à traiter les nouveaux enregistrements.
-
Destination en cas d’échec : une file d’attente SQS standard ou rubrique SNS standard pour les enregistrements qui ne peuvent pas être traités. Quand Lambda écarte un lot d’enregistrements qui est trop ancien ou qui a épuisé toutes les tentatives, il envoie les détails du lot à la file d’attente ou à la rubrique.
Retry attempts (Nouvelles tentatives) – Nombre maximum de nouvelles tentatives que Lambda effectue quand la fonction renvoie une erreur. Cela ne s’applique pas aux erreurs ou limitations de service où le lot n’a pas atteint la fonction.
Maximum age of record (Âge maximum d’enregistrement) – Âge maximum d’un enregistrement que Lambda envoie à votre fonction.
Split batch on error (Fractionner le lot en cas d’erreur) – Quand la fonction renvoie une erreur, diviser le lot en deux avant d’effectuer une nouvelle tentative. Le paramètre de taille de lot d’origine reste inchangé.
Concurrent batches per shard (Lots simultanés par partition) – Traite simultanément plusieurs lots de la même partition.
-
Enabled (Activé) – Valeur définie sur VRAI pour activer le mappage de source d’événement. Définissez la valeur sur « false » pour arrêter le traitement des enregistrements. Lambda garde une trace du dernier enregistrement traité et reprend le traitement à ce point quand il est réactivé.
Note
Kinesis facture chaque partition et, pour les diffusions améliorées, les données lues à partir du flux. Pour obtenir des informations de tarification, consultez Tarification Amazon Kinesis
Pour gérer ultérieurement la configuration de la source d’événement, choisissez le déclencheur dans le concepteur.
Filtrage d’événements Kinesis
Lorsque vous configurez Kinesis comme source d’événements pour Lambda, vous pouvez utiliser le filtrage des événements pour contrôler les enregistrements de votre flux que Lambda envoie à votre fonction pour traitement. Pour en savoir plus sur l’utilisation du filtrage d’événements Lambda avec Kinesis, consultez Filtrage avec Kinesis.
API de mappage de la source d’événement
Pour gérer une source d’événement à l’aide de la AWS Command Line Interface (AWS CLI) ou d’un AWS SDK
Pour créer le mappage des sources d'événements avec le AWS CLI, utilisez la create-event-source-mapping
commande. L'exemple suivant utilise le AWS CLI pour mapper une fonction nommée my-function
à un flux de données Kinesis. Le flux de données est spécifié par un Amazon Resource Name (ARN), avec une taille de lot de 500, à partir de l’horodatage en temps Unix.
aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:
us-east-2:123456789012:stream/lambda-stream
Vous devriez voir la sortie suivante:
{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }
Pour utiliser un consommateur, spécifiez le consommateur l’ARN au lieu de l’ARN du flux.
Configurez des options supplémentaires pour personnaliser la manière dont les lots sont traités et indiquer quand supprimer les enregistrements qui ne peuvent pas être traités. L’exemple suivant met à jour le mappage des sources d’événements afin d’envoyer un enregistrement d’échec à une file d’attente SQS standard après deux tentatives, ou si les enregistrements datent de plus d’une heure.
aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'
Vous devriez voir la sortie suivante :
{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed",
"State": "Updating", "StateTransitionReason": "User action",
"DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }
Les paramètres mis à jour sont appliqués de façon asynchrone et ne sont pas reflétés dans la sortie tant que le processus n’est pas terminé. Utilisez la commande get-event-source-mapping
pour afficher l’état actuel.
aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b
Vous devriez voir la sortie suivante :
{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action",
"DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600,
"BisectBatchOnFunctionError": false,"MaximumRetryAttempts": 2
}
Pour traiter plusieurs lots simultanément, utilisez l’option --parallelization-factor
.
aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5
Gestion des erreurs
Le mappage de source d’événement qui lit les enregistrements de votre flux Kinesis invoque votre fonction de manière synchrone et effectue de nouvelles tentatives sur les erreurs. Si Lambda limite la fonction ou renvoie une erreur sans invoquer la fonction, Lambda réessaie jusqu’à ce que les enregistrements expirent ou dépassent l’âge maximum que vous configurez pour le mappage de source d’événement.
Si la fonction reçoit les enregistrements mais renvoie une erreur, Lambda réessaie jusqu’à ce que les enregistrements du lot expirent, dépassent l’âge maximum défini ou atteignent le quota de nouvelles tentatives configuré. Pour les erreurs de fonction, vous pouvez également configurer le mappage de source d’événement de façon à diviser en deux lots un lot ayant échoué. Le fait de réessayer avec des lots de plus petite taille a pour effet d’isoler les enregistrements défectueux et de contourner les problèmes de délai d’expiration. Le fractionnement d’un lot n’est pas pris en compte dans le quota de nouvelles tentatives.
Si les mesures de gestion des erreurs échouent, Lambda ignore les enregistrements et poursuit le traitement des lots du flux. Avec les paramètres par défaut, cela signifie qu’un enregistrement défectueux peut bloquer le traitement sur la partition affectée pendant jusqu’à une semaine. Pour éviter cela, configurez le mappage de source d’événement de votre fonction avec un nombre raisonnable de nouvelles tentatives et un âge maximum d’enregistrement correspondant à votre cas d’utilisation.
Pour conserver un enregistrement des lots écartés, configurez une destination d’événement ayant échoué. Lambda envoie un document à la file d’attente ou à la rubrique de destination avec des détails sur le lot.
Configurer une destination pour les registres d’événements ayant échoué
Ouvrez la page Functions
(Fonctions) de la console Lambda. -
Choisissez une fonction.
-
Sous Function overview (Vue d’ensemble de la fonction), choisissez Add destination (Ajouter une destination).
-
Pour Source, choisissez Stream invocation (Invocation de flux).
-
Pour Stream (Flux), choisissez un flux qui est mappé à la fonction.
-
Pour Type de destination, choisissez le type de ressource qui reçoit l’enregistrement d’invocation.
-
Pour Destination, choisissez une ressource.
-
Choisissez Enregistrer.
L’exemple suivant illustre un enregistrement d’invocation pour un flux Kinesis.
Exemple enregistrement d’invocation
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }
Vous pouvez utiliser ces informations pour récupérer les enregistrements concernés à partir du flux à des fins de résolution de problèmes. Les enregistrements réels n’étant pas inclus, vous devez les récupérer du flux avant qu’ils expirent et soient perdus.
CloudWatch Métriques Amazon
Lambda émet la métrique IteratorAge
lorsque votre fonction termine le traitement d’un lot d’enregistrements. Cette métrique indique l’âge du dernier enregistrement du lot à la fin du traitement. Si votre fonction traite de nouveaux événements, vous pouvez utiliser l’âge de l’itérateur pour estimer la latence entre le moment où un enregistrement est ajouté et celui où la fonction le traite.
Une tendance à la hausse de l’âge de l’itérateur peut indiquer des problèmes liés à votre fonction. Pour plus d’informations, consultez Utilisation des métriques de fonction Lambda.
Fenêtres horaires
Les fonctions Lambda peuvent exécuter des applications de traitement de flux continu. Un flux représente des données illimitées qui circulent en continu dans votre application. Pour analyser les informations provenant de cette entrée de mise à jour continue, vous pouvez lier les enregistrements inclus à l’aide d’une fenêtre de temps définie.
Les fenêtres bascules sont des fenêtres temporelles distinctes qui s’ouvrent et se ferment à intervalles réguliers. Par défaut, les invocations Lambda sont sans état : vous ne pouvez pas les utiliser pour traiter des données sur plusieurs invocations continues sans base de données externe. Cependant, avec les fenêtres bascules, vous pouvez maintenir votre état au long des invocations. Cet état contient le résultat global des messages précédemment traités pour la fenêtre actuelle. Votre état peut être d’un maximum de 1 Mo par partition. S’il dépasse cette taille, Lambda met fin précocement à la fenêtre de traitement.
Chaque enregistrement d’un flux appartient à une fenêtre spécifique. La fonction Lambda traitera chaque enregistrement au moins une fois. Toutefois, elle ne garantit pas un seul traitement pour chaque enregistrement. Dans de rares cas, tels que pour la gestion des erreurs, certains enregistrements peuvent être sujet à de multiples traitements. Les dossiers sont toujours traités dans l’ordre dès la première fois. Si les enregistrements sont traités plusieurs fois, ils peuvent être traités dans le désordre.
Regroupement et traitement
Votre fonction gérée par l’utilisateur est invoquée tant pour l’agrégation que pour le traitement des résultats finaux de celle-ci. Lambda regroupe tous les enregistrements reçus dans la fenêtre. Vous pouvez recevoir ces enregistrements en plusieurs lots, chacun sous forme d’invocation séparée. Chaque invocation reçoit un état. Ainsi, lorsque vous utilisez des fenêtres bascules, votre réponse de fonction Lambda doit contenir une propriété state
. Si la réponse ne contient pas de propriété state
, Lambda considère qu’il s’agit d’une invocation ayant échoué. Pour satisfaire à cette condition, votre fonction peut renvoyer un objet TimeWindowEventResponse
ayant la forme JSON suivante :
Exemple TimeWindowEventResponse
values
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Note
Pour les fonctions Java, nous vous recommandons d’utiliser Map<String, String>
pour représenter l’état.
À la fin de la fenêtre, l’indicateur isFinalInvokeForWindow
est défini sur true
pour indiquer qu’il s’agit de l’état final et qu’il est prêt pour le traitement. Après le traitement, la fenêtre et votre invocation final se terminent, puis l’état est supprimé.
À la fin de votre fenêtre, Lambda applique un traitement final pour les actions sur les résultats de l’agrégation. Votre traitement final est invoqué de manière synchrone. Une fois l’invocation réussie, votre fonction contrôle le numéro de séquence et le traitement du flux continue. Si l’invocation échoue, votre fonction Lambda suspend le traitement ultérieur jusqu’à ce que l’invocation soit réussie.
Exemple KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }
Configuration
Vous pouvez configurer des fenêtres bascule lorsque vous créez ou mettez à jour un mappage de source d’événement. Pour configurer une fenêtre bascule, précisez la fenêtre en secondes. L'exemple de commande suivant AWS Command Line Interface (AWS CLI) crée un mappage des sources d'événements de streaming dont la fenêtre de basculement est de 120 secondes. La fonction Lambda définie pour l’agrégation et le traitement est nommée tumbling-window-example-function
.
aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds
120
Lambda détermine les limites des fenêtres bascule en fonction de l’heure à laquelle les enregistrements ont été insérés dans le flux. Tous les enregistrements ont un horodatage approximatif disponible que Lambda utilise pour déterminer des limites.
Les agrégations de fenêtres bascule ne prennent pas en charge le repartitionnement. Quand la partition prend fin, Lambda considère que la fenêtre de traitement est fermée, et les partitions enfants entament leur propre fenêtre de traitement dans un nouvel état.
Les fenêtres bascule prennent complètement en charge les stratégies de nouvelle tentative existantes maxRetryAttempts
et maxRecordAge
.
Exemple Handler.py – Agrégation et traitement
La fonction Python suivante montre comment regrouper et traiter votre état final :
def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}
Signalement des échecs d’articles par lots
Lors de l’utilisation et du traitement de données de streaming à partir d’une source d’événement, par défaut, Lambda n’effectue un point de contrôle sur le numéro de séquence le plus élevé d’un lot que lorsque celui-ci est un succès complet. Lambda traite tous les autres résultats comme un échec complet et recommence à traiter le lot jusqu’à atteindre la limite de nouvelles tentatives. Pour autoriser des succès partiels lors du traitement des lots à partir d’un flux, activez ReportBatchItemFailures
. Autoriser des succès partiels peut permettre de réduire le nombre de nouvelles tentatives sur un enregistrement, mais cela n’empêche pas entièrement la possibilité de nouvelles tentatives dans un enregistrement réussi.
Pour activer ReportBatchItemFailures
, incluez la valeur enum ReportBatchItemFailures
dans la listeFunctionResponseTypes
. Cette liste indique quels types de réponse sont activés pour votre fonction. Vous pouvez configurer cette liste lorsque vous créez ou mettez à jour un mappage de source d’événement.
Syntaxe du rapport
Lors de la configuration des rapports d’échec d’articles de lot, la classe StreamsEventResponse
est renvoyée avec une liste d’échecs d’articles de lot. Vous pouvez utiliser un objet StreamsEventResponse
pour renvoyer le numéro de séquence du premier enregistrement ayant échoué dans le lot. Vous pouvez également créer votre classe personnalisée en utilisant la syntaxe de réponse correcte. La structure JSON suivante montre la syntaxe de réponse requise :
{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
Note
Si le tableau batchItemFailures
contient plusieurs éléments, Lambda utilise l’enregistrement portant le numéro de séquence le plus bas comme point de contrôle. Lambda réessaie ensuite tous les enregistrements à partir de ce point de contrôle.
Conditions de réussite et d’échec
Lambda traite un lot comme un succès complet si vous renvoyez l’un des éléments suivants :
-
Une liste
batchItemFailure
vide -
Une liste
batchItemFailure
nulle -
Une
EventResponse
vide -
Un nu
EventResponse
Lambda traite un lot comme un échec complet si vous renvoyez l’un des éléments suivants :
-
Une chaîne vid
itemIdentifier
-
Un
itemIdentifier
nul -
Un
itemIdentifier
avec un nom de clé incorrect
Lambda effectue des nouvelles tentatives en cas d’échec en fonction de votre stratégie de nouvelle tentative.
Diviser un lot
Si votre invocation échoue et que BisectBatchOnFunctionError
est activé, le lot est divisé en deux quel que soit votre paramètre ReportBatchItemFailures
.
Quand une réponse de succès partiel de lot est reçue et que les paramètres BisectBatchOnFunctionError
et ReportBatchItemFailures
sont activés, le lot est divisé au numéro de séquence renvoyé, et Lambda n’effectue de nouvelle tentative que sur les enregistrements restants.
Voici quelques exemples de codes de fonctions qui renvoient la liste des identifiants des messages ayant échoué dans le lot :
Paramètres de configuration d’Amazon Kinesis
Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMappinget les mêmes opérations d'UpdateEventSourceMappingAPI. Cependant, seuls certains paramètres s’appliquent à Kinesis.
Paramètres de source d’événement qui s’appliquent à Kinesis | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Paramètre | Obligatoire | Par défaut | Remarques | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BatchSize |
N |
100 |
Maximum : 10 000. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BisectBatchOnFunctionError |
N |
FAUX |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
DestinationConfig |
N |
File d’attente Amazon SQS standard ou destination de rubrique Amazon SNS standard pour les enregistrements ignorés |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Activées |
N |
VRAI |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
EventSourceArn |
Y |
ARN du flux de données ou d’un consommateur de flux |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FunctionName |
Y |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumBatchingWindowInSeconds |
N |
0 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumRecordAgeInSeconds |
N |
-1 |
-1 signifie infini : Lambda ne supprime pas les enregistrements (les paramètres de conservation des données de Kinesis Data Streams s'appliquent toujours) Minimum : -1 Maximum : 604 800 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumRetryAttempts |
N |
-1 |
-1 signifie infini : les registres qui ont échoué sont réessayés jusqu’à ce que le registre expire. Minimum : -1 Maximum : 10 000. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
ParallelizationFactor |
N |
1 |
Maximum : 10 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StartingPosition |
Y |
AT_TIMESTAMP, TRIM_HORIZON ou DERNIER |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StartingPositionTimestamp |
N |
Valable uniquement s'il StartingPosition est défini sur AT_TIMESTAMP. L’heure à partir de laquelle commencer la lecture, en secondes au format horaire Unix. |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
TumblingWindowInSeconds |
N |
Minimum : 0 Maximum : 900 |