Comment Lambda traite les enregistrements issus d'Amazon Kinesis Data Streams - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Comment Lambda traite les enregistrements issus d'Amazon Kinesis Data Streams

Vous pouvez utiliser une fonction Lambda pour traiter les enregistrements d'un flux de données Amazon Kinesis. Vous pouvez associer une fonction Lambda à un consommateur de débit partagé Kinesis Data Streams (itérateur standard) ou à un consommateur de débit dédié doté d'un ventilateur amélioré. Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis à la recherche d'enregistrements à l'aide du protocole. HTTP Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.

Pour plus d’informations sur les flux de données Kinesis, consultez Lecture de données à partir d’Amazon Kinesis Data Streams.

Note

Kinesis facture chaque partition et, pour les diffusions améliorées, les données lues à partir du flux. Pour obtenir des informations de tarification, consultez Tarification Amazon Kinesis.

Flux d’interrogation et de mise en lots

Lambda lit les enregistrements du flux de données et invoque votre fonction de manière synchrone avec un événement contenant des enregistrements de flux. Lambda lit les registres par lots et invoque votre fonction pour les traiter. Chaque lot contient des enregistrements provenant d’un seul flux de données/partition.

Pour les flux de données Kinesis standard, Lambda interroge les partitions de votre flux à la recherche d'enregistrements au rythme d'une fois par seconde pour chaque partition. Pour le fan-out amélioré par Kinesis, Lambda utilise HTTP une connexion /2 pour écouter les enregistrements envoyés par Kinesis. Lorsque des enregistrements sont disponibles, Lambda invoque votre fonction et attend le résultat.

Par défaut, Lambda invoque votre fonction dès que des enregistrements sont disponibles. Si le lot que Lambda lit à partir de la source d’événements ne comprend qu’un seul enregistrement, Lambda envoie un seul registre à la fonction. Pour éviter d’invoquer la fonction avec un petit nombre de registres, vous pouvez indiquer à la source d’événement de les mettre en mémoire tampon pendant 5 minutes en configurant une fenêtre de traitement par lots. Avant d’invoquer la fonction, Lambda continue de lire les registres de la source d’événements jusqu’à ce qu’il ait rassemblé un lot complet, que la fenêtre de traitement par lot expire ou que le lot atteigne la limite de charge utile de 6 Mo. Pour de plus amples informations, veuillez consulter Comportement de traitement par lots.

Avertissement

Les mappages de sources d'événements Lambda traitent chaque événement au moins une fois, et le traitement des enregistrements peut être dupliqué. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente dans le Knowledge Center. AWS

Lambda n'attend pas que les extensions configurées soient terminées avant d'envoyer le lot suivant pour traitement. En d’autres termes, vos extensions peuvent continuer à s’exécuter pendant que Lambda traite le prochain lot d’enregistrements. Cela peut causer des problèmes de limitation si vous enfreignez l’un des paramètres ou l’une des limites de simultanéité de votre compte. Pour détecter s’il s’agit d’un problème éventuel, surveillez vos fonctions et vérifiez si vous observez des métriques de simultanéité plus élevées que prévu pour votre mappage des sources d’événements. En raison de la brièveté des intervalles entre les invocations, Lambda peut brièvement signaler une utilisation simultanée supérieure au nombre de partitions. Cela peut être vrai même pour les fonctions Lambda sans extensions.

Configurez le ParallelizationFactorparamètre pour traiter une partition d'un flux de données Kinesis avec plusieurs invocations Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge à partir d’une partition via un facteur de parallélisation de 1 (par défaut) à 10. Par exemple, lorsque vous définissez la ParallelizationFactor valeur 2, vous pouvez avoir 200 appels Lambda simultanés au maximum pour traiter 100 fragments de données Kinesis (bien qu'en pratique, vous puissiez voir des valeurs différentes pour la métrique). ConcurrentExecutions Cela permet d’augmenter le débit de traitement quand le volume de données est volatil et que la valeur du paramètre IteratorAge est élevée. Lorsque vous augmentez le nombre de lots simultanés par partition, Lambda garantit toujours un traitement dans l'ordre au niveau de la clé de partition.

Vous pouvez également utiliser l'ParallelizationFactoragrégation Kinesis. Le comportement du mappage des sources d'événements varie selon que vous utilisez ou non un ventilateur amélioré :

  • Sans ventilation améliorée : tous les événements d'un événement agrégé doivent avoir la même clé de partition. La clé de partition doit également correspondre à celle de l'événement agrégé. Si les événements contenus dans l'événement agrégé ont des clés de partition différentes, Lambda ne peut garantir le traitement ordonné des événements par clé de partition.

  • Avec un fan-out amélioré : Lambda décode d'abord l'événement agrégé en événements individuels. L'événement agrégé peut avoir une clé de partition différente de celle des événements qu'il contient. Cependant, les événements qui ne correspondent pas à la clé de partition sont supprimés et perdus. Lambda ne traite pas ces événements et ne les envoie pas vers une destination de défaillance configurée.

Exemple d’évènement

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }