Utilisation AWS Lambda avec Amazon Kinesis - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation AWS Lambda avec Amazon Kinesis

Note

Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.

Vous pouvez utiliser une AWS Lambda fonction pour traiter les enregistrements d'un flux de données Amazon Kinesis.

Un flux de données Kinesis est un ensemble de partitions. Chaque partition contient une séquence d’enregistrements de données. Un consommateur est une application qui traite les données d’un flux de données Kinesis. Vous pouvez mapper une fonction Lambda à un consommateur à débit partagé (itérateur standard) ou à un consommateur à débit dédié avec diffusion améliorée.

Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis pour des enregistrements qui utilisent le protocole HTTP. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.

Pour réduire la latence et optimiser le débit en lecture, vous pouvez créer un consommateur de flux de données avec diffusion améliorée. Les consommateurs de flux obtiennent une connexion dédiée pour chaque partition qui n’a pas d’impact sur les autres applications lisant sur le flux. Le débit dédié peut aider si vous avez de nombreuses applications lisant les mêmes données, ou si vous retraitez un flux avec de gros enregistrements. Kinesis envoie des enregistrements à Lambda via HTTP/2.

Pour plus d’informations sur les flux de données Kinesis, consultez Lecture de données à partir d’Amazon Kinesis Data Streams.

Exemple d’évènement

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

Flux d’interrogation et de mise en lots

Lambda lit les enregistrements du flux de données et invoque votre fonction de manière synchrone avec un événement contenant des enregistrements de flux. Lambda lit les registres par lots et invoque votre fonction pour les traiter. Chaque lot contient des enregistrements provenant d’un seul flux de données/partition.

Par défaut, Lambda invoque votre fonction dès que des enregistrements sont disponibles. Si le lot que Lambda lit à partir de la source d’événements ne comprend qu’un seul enregistrement, Lambda envoie un seul registre à la fonction. Pour éviter d’invoquer la fonction avec un petit nombre de registres, vous pouvez indiquer à la source d’événement de les mettre en mémoire tampon pendant 5 minutes en configurant une fenêtre de traitement par lots. Avant d’invoquer la fonction, Lambda continue de lire les registres de la source d’événements jusqu’à ce qu’il ait rassemblé un lot complet, que la fenêtre de traitement par lot expire ou que le lot atteigne la limite de charge utile de 6 Mo. Pour plus d’informations, consultez Comportement de traitement par lots.

Avertissement

Les mappages de sources d'événements Lambda traitent chaque événement au moins une fois, et le traitement des enregistrements peut être dupliqué. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente dans le Knowledge Center. AWS

Si votre fonction renvoie une erreur, Lambda réessaie de traiter le lot jusqu’à ce que le traitement réussisse ou que les données expirent. Pour éviter le blocage de partitions, vous pouvez configurer le mappage de source d’événement pour effectuer de nouvelles tentatives avec des lots de plus petite taille, limiter le nombre de tentatives ou ignorer les enregistrements trop anciens. Pour conserver les événements rejetés, vous pouvez configurer le mappage des sources d’événements afin d’envoyer les détails des lots ayant échoué à une file d’attente SQS standard ou à une rubrique SNS standard.

Pour augmenter la simultanéité, vous pouvez également traiter plusieurs lots en parallèle à partir de chaque partition. Lambda peut traiter simultanément jusqu’à 10 lots dans chaque partition. Si vous augmentez le nombre de lots simultanés par partition, Lambda assure toujours un traitement dans l’ordre au niveau clé de partition.

Configurez le ParallelizationFactorparamètre pour traiter une partition d'un flux de données Kinesis ou DynamoDB avec plusieurs invocations Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge à partir d’une partition via un facteur de parallélisation de 1 (par défaut) à 10. Par exemple, quand vous définissez ParallelizationFactor sur 2, vous pouvez avoir jusqu'à 200 invocations Lambda simultanés pour traiter 100 partitions de données Kinesis (bien que, dans la réalité, la métrique ConcurrentExecutions puisse indiquer une valeur différente). Cela permet d’augmenter le débit de traitement quand le volume de données est volatil et que la valeur du paramètre IteratorAge est élevée.

Vous pouvez également utiliser l'ParallelizationFactoragrégation avec Kinesis. Le comportement du mappage des sources d'événements varie selon que vous utilisez ou non un ventilateur amélioré :

  • Sans ventilation améliorée : tous les événements d'un événement agrégé doivent avoir la même clé de partition. La clé de partition doit également correspondre à celle de l'événement agrégé. Si les événements contenus dans l'événement agrégé ont des clés de partition différentes, Lambda ne peut garantir le traitement ordonné des événements par clé de partition.

  • Avec un ventilateur amélioré : Lambda décode d'abord l'événement agrégé en événements individuels. L'événement agrégé peut avoir une clé de partition différente de celle des événements qu'il contient. Cependant, les événements qui ne correspondent pas à la clé de partition sont supprimés et perdus. Lambda ne traite pas ces événements et ne les envoie pas vers une destination de défaillance configurée.

Position de départ du sondage et du stream

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON ou AT_TIMESTAMP.

Configuration de votre fonction et de votre flux de données

Votre fonction Lambda est une application consommateur pour votre flux de données. Elle traite un lot d’enregistrements à la fois à partir de chaque partition. Vous pouvez mapper une fonction Lambda à un flux de données (itérateur standard) ou au consommateur d’un flux (diffusion améliorée).

Pour les itérateurs standard, Lambda interroge chaque partition de votre flux Kinesis afin d’obtenir des enregistrements à une fréquence de base d’une fois par seconde. Lorsque d’autres enregistrements sont disponibles, Lambda continue de traiter les lots jusqu’à ce que la fonction rattrape le flux. Le mappage de source d’événement partage le débit de lecture avec d’autres utilisateurs de la partition.

Pour réduire la latence et d’optimiser le débit en lecture, créez un consommateur de flux de données avec diffusion améliorée. Les consommateurs avec diffusion améliorée obtiennent une connexion dédiée pour chaque partition qui n’a pas d’impact sur les autres applications lisant sur le flux. Les consommateurs de flux utilisent HTTP/2 afin de réduire la latence en transférant les enregistrements à Lambda via une connexion longue durée et en comprimant les en-têtes de requête. Vous pouvez créer un consommateur de flux à l'aide de l'API Kinesis RegisterStreamConsumer.

aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Vous devriez voir la sortie suivante :

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

Pour augmenter la vitesse à laquelle votre fonction traite les enregistrements, ajoutez des partitions à votre flux de données. Lambda traite les enregistrements de chaque partition dans l’ordre. Il arrête de traiter les enregistrements supplémentaires d’une partition si votre fonction renvoie une erreur. Plus de partitions signifient plus de lots traités en une seule fois, ce qui réduit l’impact des erreurs sur la simultanéité.

Si votre fonction ne peut pas augmenter sa capacité pour traiter le nombre total de lots simultanés, demandez une augmentation de quota ou réservez de la simultanéité pour votre fonction.

Autorisations du rôle d’exécution

Lambda a besoin des autorisations suivantes pour gérer les ressources liées à votre flux de données Kinesis. La politique AWSLambdaKinesisExecutionRolegérée inclut ces autorisations. Vous pouvez ajouter cette politique gérée au rôle d'exécution de votre fonction.

Si votre flux de données Kinesis et votre fonction Lambda se trouvent dans des comptes différents, assurez-vous que votre ressource de flux accorde des autorisations kinesis:DescribeStream au rôle ou au compte d'exécution de votre fonction Lambda.

En outre, lorsque vous créez le mappage de vos sources d'événements depuis la console, vous devez disposer des autorisations kinesis : ListStreams et kinesis : ListStreamConsumers.

Pour envoyer des enregistrements de lots ayant échoué à une file d’attente SQS standard ou à une rubrique SNS standard, votre fonction a besoin d’autorisations supplémentaires. Chaque service de destination nécessite une autorisation différente, comme suit :

Ajouter des autorisations et créer le mappage des sources d'événements

Créez un mappage de source d’événement pour indiquer à Lambda d’envoyer des enregistrements de votre flux de données à une fonction Lambda. Vous pouvez créer plusieurs mappages de source d’événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou pour traiter des éléments en provenance de plusieurs flux de données avec une seule fonction. Lorsque vous traitez des éléments à partir de plusieurs flux de données, chaque lot ne contiendra que des enregistrements provenant d’un seul flux/partition.

Pour configurer votre fonction afin qu'elle lise à partir d'un flux de données Kinesis, ajoutez la politique AWSLambdaKinesisExecutionRole AWS gérée à votre rôle d'exécution et créez un déclencheur Kinesis.

Pour ajouter des autorisations et créer un déclencheur
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom d’une fonction.

  3. Choisissez l'onglet Configuration, puis Permissions (Autorisations).

  4. Sous Nom du rôle, choisissez le lien vers votre rôle d'exécution. Ce lien ouvre le rôle dans la console IAM.

    
              Lien vers le rôle d'exécution
  5. Choisissez Ajouter des autorisations, puis Attacher des politiques.

    
              Joindre des politiques dans la console IAM
  6. Dans le champ de recherche, entrez AWSLambdaKinesisExecutionRole. Ajoutez cette politique à votre rôle d'exécution. Il s'agit d'une politique AWS gérée qui contient les autorisations dont votre fonction a besoin pour lire un flux Kinesis. Pour plus d'informations sur cette politique, consultez AWSLambdaKinesisExecutionRolela référence des politiques AWS gérées.

  7. Retournez à votre fonction dans la console Lambda. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).

    
              Section de présentation des fonctions de la console Lambda
  8. Choisissez un type de déclencheur.

  9. Configurez les options requises, puis choisissez Add (Ajouter).

Lambda prend en charge les options suivantes pour les sources d'événements Kinesis :

Options de source d’événement
  • Kinesis stream (Flux Kinesis) – Flux Kinesis à partir duquel lire les enregistrements.

  • Consumer (Consommateur) (facultatif) – Utilisez un flux consommateur pour lire le flux sur une connexion dédiée.

  • Batch size (Taille de lot) – Nombre d’enregistrements par lot à envoyer à la fonction, jusqu’à 10 000. Lambda transmet tous les enregistrements du lot à la fonction en une seule invocation, tant que la taille totale des événements ne dépasse pas la limite de charge utile pour une invocation synchrone (6 Mo).

  • Batch window (Fenêtre de traitement par lots) – Intervalle de temps maximum (en secondes) pour collecter des enregistrements avant d’invoquer la fonction.

  • Starting position (Position de départ) – Traiter uniquement les nouveaux enregistrements, tous les enregistrements existants ou les enregistrements créés après une certaine date.

    • Latest (Derniers) – Traiter les nouveaux enregistrements ajoutés au flux.

    • Trim horizon (Supprimer l’horizon) – Traiter tous les enregistrements figurant dans le flux.

    • At timestamp (À l’horodatage) – Traitez les enregistrements à partir d’une heure spécifique.

    Après le traitement de tous les enregistrements existants, la fonction est à jour et continue à traiter les nouveaux enregistrements.

  • Destination en cas d’échec : une file d’attente SQS standard ou rubrique SNS standard pour les enregistrements qui ne peuvent pas être traités. Quand Lambda écarte un lot d’enregistrements qui est trop ancien ou qui a épuisé toutes les tentatives, il envoie les détails du lot à la file d’attente ou à la rubrique.

  • Retry attempts (Nouvelles tentatives) – Nombre maximum de nouvelles tentatives que Lambda effectue quand la fonction renvoie une erreur. Cela ne s’applique pas aux erreurs ou limitations de service où le lot n’a pas atteint la fonction.

  • Maximum age of record (Âge maximum d’enregistrement) – Âge maximum d’un enregistrement que Lambda envoie à votre fonction.

  • Split batch on error (Fractionner le lot en cas d’erreur) – Quand la fonction renvoie une erreur, diviser le lot en deux avant d’effectuer une nouvelle tentative. Le paramètre de taille de lot d’origine reste inchangé.

  • Concurrent batches per shard (Lots simultanés par partition) – Traite simultanément plusieurs lots de la même partition.

  • Enabled (Activé) – Valeur définie sur VRAI pour activer le mappage de source d’événement. Définissez la valeur sur « false » pour arrêter le traitement des enregistrements. Lambda garde une trace du dernier enregistrement traité et reprend le traitement à ce point quand il est réactivé.

Note

Kinesis facture chaque partition et, pour les diffusions améliorées, les données lues à partir du flux. Pour obtenir des informations de tarification, consultez Tarification Amazon Kinesis.

Pour gérer ultérieurement la configuration de la source d’événement, choisissez le déclencheur dans le concepteur.

Filtrage d’événements Kinesis

Lorsque vous configurez Kinesis comme source d’événements pour Lambda, vous pouvez utiliser le filtrage des événements pour contrôler les enregistrements de votre flux que Lambda envoie à votre fonction pour traitement. Pour en savoir plus sur l’utilisation du filtrage d’événements Lambda avec Kinesis, consultez Filtrage avec Kinesis.

API de mappage de la source d’événement

Pour gérer une source d’événement à l’aide de la AWS Command Line Interface (AWS CLI) ou d’un AWS SDK, vous pouvez utiliser les opérations d’API suivantes :

Pour créer le mappage des sources d'événements avec le AWS CLI, utilisez la create-event-source-mapping commande. L'exemple suivant utilise le AWS CLI pour mapper une fonction nommée my-function à un flux de données Kinesis. Le flux de données est spécifié par un Amazon Resource Name (ARN), avec une taille de lot de 500, à partir de l’horodatage en temps Unix.

aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Vous devriez voir la sortie suivante:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Pour utiliser un consommateur, spécifiez le consommateur l’ARN au lieu de l’ARN du flux.

Configurez des options supplémentaires pour personnaliser la manière dont les lots sont traités et indiquer quand supprimer les enregistrements qui ne peuvent pas être traités. L’exemple suivant met à jour le mappage des sources d’événements afin d’envoyer un enregistrement d’échec à une file d’attente SQS standard après deux tentatives, ou si les enregistrements datent de plus d’une heure.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

Vous devriez voir la sortie suivante :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Les paramètres mis à jour sont appliqués de façon asynchrone et ne sont pas reflétés dans la sortie tant que le processus n’est pas terminé. Utilisez la commande get-event-source-mapping pour afficher l’état actuel.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

Vous devriez voir la sortie suivante :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Pour traiter plusieurs lots simultanément, utilisez l’option --parallelization-factor.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Gestion des erreurs

Le mappage de source d’événement qui lit les enregistrements de votre flux Kinesis invoque votre fonction de manière synchrone et effectue de nouvelles tentatives sur les erreurs. Si Lambda limite la fonction ou renvoie une erreur sans invoquer la fonction, Lambda réessaie jusqu’à ce que les enregistrements expirent ou dépassent l’âge maximum que vous configurez pour le mappage de source d’événement.

Si la fonction reçoit les enregistrements mais renvoie une erreur, Lambda réessaie jusqu’à ce que les enregistrements du lot expirent, dépassent l’âge maximum défini ou atteignent le quota de nouvelles tentatives configuré. Pour les erreurs de fonction, vous pouvez également configurer le mappage de source d’événement de façon à diviser en deux lots un lot ayant échoué. Le fait de réessayer avec des lots de plus petite taille a pour effet d’isoler les enregistrements défectueux et de contourner les problèmes de délai d’expiration. Le fractionnement d’un lot n’est pas pris en compte dans le quota de nouvelles tentatives.

Si les mesures de gestion des erreurs échouent, Lambda ignore les enregistrements et poursuit le traitement des lots du flux. Avec les paramètres par défaut, cela signifie qu’un enregistrement défectueux peut bloquer le traitement sur la partition affectée pendant jusqu’à une semaine. Pour éviter cela, configurez le mappage de source d’événement de votre fonction avec un nombre raisonnable de nouvelles tentatives et un âge maximum d’enregistrement correspondant à votre cas d’utilisation.

Pour conserver un enregistrement des lots écartés, configurez une destination d’événement ayant échoué. Lambda envoie un document à la file d’attente ou à la rubrique de destination avec des détails sur le lot.

Configurer une destination pour les registres d’événements ayant échoué
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Stream invocation (Invocation de flux).

  5. Pour Stream (Flux), choisissez un flux qui est mappé à la fonction.

  6. Pour Type de destination, choisissez le type de ressource qui reçoit l’enregistrement d’invocation.

  7. Pour Destination, choisissez une ressource.

  8. Choisissez Enregistrer.

L’exemple suivant illustre un enregistrement d’invocation pour un flux Kinesis.

Exemple enregistrement d’invocation
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Vous pouvez utiliser ces informations pour récupérer les enregistrements concernés à partir du flux à des fins de résolution de problèmes. Les enregistrements réels n’étant pas inclus, vous devez les récupérer du flux avant qu’ils expirent et soient perdus.

CloudWatch Métriques Amazon

Lambda émet la métrique IteratorAge lorsque votre fonction termine le traitement d’un lot d’enregistrements. Cette métrique indique l’âge du dernier enregistrement du lot à la fin du traitement. Si votre fonction traite de nouveaux événements, vous pouvez utiliser l’âge de l’itérateur pour estimer la latence entre le moment où un enregistrement est ajouté et celui où la fonction le traite.

Une tendance à la hausse de l’âge de l’itérateur peut indiquer des problèmes liés à votre fonction. Pour plus d’informations, consultez Utilisation des métriques de fonction Lambda.

Fenêtres horaires

Les fonctions Lambda peuvent exécuter des applications de traitement de flux continu. Un flux représente des données illimitées qui circulent en continu dans votre application. Pour analyser les informations provenant de cette entrée de mise à jour continue, vous pouvez lier les enregistrements inclus à l’aide d’une fenêtre de temps définie.

Les fenêtres bascules sont des fenêtres temporelles distinctes qui s’ouvrent et se ferment à intervalles réguliers. Par défaut, les invocations Lambda sont sans état : vous ne pouvez pas les utiliser pour traiter des données sur plusieurs invocations continues sans base de données externe. Cependant, avec les fenêtres bascules, vous pouvez maintenir votre état au long des invocations. Cet état contient le résultat global des messages précédemment traités pour la fenêtre actuelle. Votre état peut être d’un maximum de 1 Mo par partition. S’il dépasse cette taille, Lambda met fin précocement à la fenêtre de traitement.

Chaque enregistrement d’un flux appartient à une fenêtre spécifique. La fonction Lambda traitera chaque enregistrement au moins une fois. Toutefois, elle ne garantit pas un seul traitement pour chaque enregistrement. Dans de rares cas, tels que pour la gestion des erreurs, certains enregistrements peuvent être sujet à de multiples traitements. Les dossiers sont toujours traités dans l’ordre dès la première fois. Si les enregistrements sont traités plusieurs fois, ils peuvent être traités dans le désordre.

Regroupement et traitement

Votre fonction gérée par l’utilisateur est invoquée tant pour l’agrégation que pour le traitement des résultats finaux de celle-ci. Lambda regroupe tous les enregistrements reçus dans la fenêtre. Vous pouvez recevoir ces enregistrements en plusieurs lots, chacun sous forme d’invocation séparée. Chaque invocation reçoit un état. Ainsi, lorsque vous utilisez des fenêtres bascules, votre réponse de fonction Lambda doit contenir une propriété state. Si la réponse ne contient pas de propriété state, Lambda considère qu’il s’agit d’une invocation ayant échoué. Pour satisfaire à cette condition, votre fonction peut renvoyer un objet TimeWindowEventResponse ayant la forme JSON suivante :

Exemple TimeWindowEventResponse values
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Note

Pour les fonctions Java, nous vous recommandons d’utiliser Map<String, String> pour représenter l’état.

À la fin de la fenêtre, l’indicateur isFinalInvokeForWindow est défini sur true pour indiquer qu’il s’agit de l’état final et qu’il est prêt pour le traitement. Après le traitement, la fenêtre et votre invocation final se terminent, puis l’état est supprimé.

À la fin de votre fenêtre, Lambda applique un traitement final pour les actions sur les résultats de l’agrégation. Votre traitement final est invoqué de manière synchrone. Une fois l’invocation réussie, votre fonction contrôle le numéro de séquence et le traitement du flux continue. Si l’invocation échoue, votre fonction Lambda suspend le traitement ultérieur jusqu’à ce que l’invocation soit réussie.

Exemple KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuration

Vous pouvez configurer des fenêtres bascule lorsque vous créez ou mettez à jour un mappage de source d’événement. Pour configurer une fenêtre bascule, précisez la fenêtre en secondes. L'exemple de commande suivant AWS Command Line Interface (AWS CLI) crée un mappage des sources d'événements de streaming dont la fenêtre de basculement est de 120 secondes. La fonction Lambda définie pour l’agrégation et le traitement est nommée tumbling-window-example-function.

aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda détermine les limites des fenêtres bascule en fonction de l’heure à laquelle les enregistrements ont été insérés dans le flux. Tous les enregistrements ont un horodatage approximatif disponible que Lambda utilise pour déterminer des limites.

Les agrégations de fenêtres bascule ne prennent pas en charge le repartitionnement. Quand la partition prend fin, Lambda considère que la fenêtre de traitement est fermée, et les partitions enfants entament leur propre fenêtre de traitement dans un nouvel état.

Les fenêtres bascule prennent complètement en charge les stratégies de nouvelle tentative existantes maxRetryAttempts et maxRecordAge.

Exemple Handler.py – Agrégation et traitement

La fonction Python suivante montre comment regrouper et traiter votre état final :

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}

Signalement des échecs d’articles par lots

Lors de l’utilisation et du traitement de données de streaming à partir d’une source d’événement, par défaut, Lambda n’effectue un point de contrôle sur le numéro de séquence le plus élevé d’un lot que lorsque celui-ci est un succès complet. Lambda traite tous les autres résultats comme un échec complet et recommence à traiter le lot jusqu’à atteindre la limite de nouvelles tentatives. Pour autoriser des succès partiels lors du traitement des lots à partir d’un flux, activez ReportBatchItemFailures. Autoriser des succès partiels peut permettre de réduire le nombre de nouvelles tentatives sur un enregistrement, mais cela n’empêche pas entièrement la possibilité de nouvelles tentatives dans un enregistrement réussi.

Pour activer ReportBatchItemFailures, incluez la valeur enum ReportBatchItemFailures dans la listeFunctionResponseTypes. Cette liste indique quels types de réponse sont activés pour votre fonction. Vous pouvez configurer cette liste lorsque vous créez ou mettez à jour un mappage de source d’événement.

Syntaxe du rapport

Lors de la configuration des rapports d’échec d’articles de lot, la classe StreamsEventResponse est renvoyée avec une liste d’échecs d’articles de lot. Vous pouvez utiliser un objet StreamsEventResponse pour renvoyer le numéro de séquence du premier enregistrement ayant échoué dans le lot. Vous pouvez également créer votre classe personnalisée en utilisant la syntaxe de réponse correcte. La structure JSON suivante montre la syntaxe de réponse requise :

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
Note

Si le tableau batchItemFailures contient plusieurs éléments, Lambda utilise l’enregistrement portant le numéro de séquence le plus bas comme point de contrôle. Lambda réessaie ensuite tous les enregistrements à partir de ce point de contrôle.

Conditions de réussite et d’échec

Lambda traite un lot comme un succès complet si vous renvoyez l’un des éléments suivants :

  • Une liste batchItemFailure vide

  • Une liste batchItemFailure nulle

  • Une EventResponse vide

  • Un nu EventResponse

Lambda traite un lot comme un échec complet si vous renvoyez l’un des éléments suivants :

  • Une chaîne vid itemIdentifier

  • Un itemIdentifier nul

  • Un itemIdentifier avec un nom de clé incorrect

Lambda effectue des nouvelles tentatives en cas d’échec en fonction de votre stratégie de nouvelle tentative.

Diviser un lot

Si votre invocation échoue et que BisectBatchOnFunctionError est activé, le lot est divisé en deux quel que soit votre paramètre ReportBatchItemFailures.

Quand une réponse de succès partiel de lot est reçue et que les paramètres BisectBatchOnFunctionError et ReportBatchItemFailures sont activés, le lot est divisé au numéro de séquence renvoyé, et Lambda n’effectue de nouvelle tentative que sur les enregistrements restants.

Voici quelques exemples de codes de fonctions qui renvoient la liste des identifiants des messages ayant échoué dans le lot :

.NET
AWS SDK for .NET
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signalement des échecs d’articles par lots Kinesis avec Lambda à l’aide de .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }
Go
Kit SDK for Go V2
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot Kinesis avec Lambda à l'aide de Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, record := range kinesisEvent.Records { curRecordSequenceNumber := "" // Process your record if /* Your record processing condition here */ { curRecordSequenceNumber = record.Kinesis.SequenceNumber } // Add a condition to check if the record processing failed if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber}) } } kinesisBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return kinesisBatchResponse, nil } func main() { lambda.Start(handler) }
Java
SDK pour Java 2.x
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signalement des échecs d’articles par lots Kinesis avec Lambda à l’aide de Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
JavaScript
SDK pour JavaScript (v3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signalement des échecs d’articles par lots Kinesis avec Lambda à l’aide de JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Signaler les défaillances d'éléments de lot Kinesis avec Lambda à l'aide de. TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
Kit SDK pour PHP
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler des défaillances d'éléments de lot Kinesis avec Lambda à l'aide de PHP.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $kinesisEvent = new KinesisEvent($event); $this->logger->info("Processing records"); $records = $kinesisEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK pour Python (Boto3)
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signalement des échecs d’articles par lots Kinesis avec Lambda à l’aide de Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
Kit SDK pour Ruby
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot Kinesis avec Lambda à l'aide de Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) batch_item_failures = [] event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue StandardError => err puts "An error occurred #{err}" # Since we are working with streams, we can return the failed item immediately. # Lambda will immediately begin to retry processing from this failed item onwards. return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] } end end puts "Successfully processed #{event['Records'].length} records." { batchItemFailures: batch_item_failures } end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('utf-8') # Placeholder for actual async work sleep(1) data end
Rust
SDK pour Rust
Note

Il y en a plus sur GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot Kinesis avec Lambda à l'aide de Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Paramètres de configuration d’Amazon Kinesis

Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMappinget les mêmes opérations d'UpdateEventSourceMappingAPI. Cependant, seuls certains paramètres s’appliquent à Kinesis.

Paramètres de source d’événement qui s’appliquent à Kinesis
Paramètre Obligatoire Par défaut Remarques

BatchSize

N

100

Maximum : 10 000.

BisectBatchOnFunctionError

N

FAUX

DestinationConfig

N

File d’attente Amazon SQS standard ou destination de rubrique Amazon SNS standard pour les enregistrements ignorés

Activées

N

VRAI

EventSourceArn

Y

ARN du flux de données ou d’un consommateur de flux

FunctionName

Y

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 signifie infini : Lambda ne supprime pas les enregistrements (les paramètres de conservation des données de Kinesis Data Streams s'appliquent toujours)

Minimum : -1

Maximum : 604 800

MaximumRetryAttempts

N

-1

-1 signifie infini : les registres qui ont échoué sont réessayés jusqu’à ce que le registre expire.

Minimum : -1

Maximum : 10 000.

ParallelizationFactor

N

1

Maximum : 10

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON ou DERNIER

StartingPositionTimestamp

N

Valable uniquement s'il StartingPosition est défini sur AT_TIMESTAMP. L’heure à partir de laquelle commencer la lecture, en secondes au format horaire Unix.

TumblingWindowInSeconds

N

Minimum : 0

Maximum : 900