Utilisation AWS Lambda avec Amazon DynamoDB - AWS Lambda

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation AWS Lambda avec Amazon DynamoDB

Note

Si vous souhaitez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.

Vous pouvez utiliser une AWS Lambda fonction pour traiter les enregistrements d'un flux Amazon DynamoDB. DynamoDB Streams vous permet de déclencher une fonction Lambda pour effectuer un travail supplémentaire chaque fois qu’une table DynamoDB est mise à jour.

Lambda lit les enregistrements du flux et invoque votre fonction de manière synchrone avec un événement contenant des enregistrements de flux. Lambda lit les enregistrements par lots et invoque votre fonction pour les traiter.

Exemple d’évènement

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

Flux d’interrogation et de mise en lots

Lambda interroge les partitions de votre flux DynamoDB en quête d’enregistrements à une fréquence de base de quatre fois par seconde. Lorsque des enregistrements sont disponibles, Lambda invoque votre fonction et attend le résultat. Si le traitement réussit, Lambda reprend l’interrogation jusqu’à recevoir plus d’enregistrements.

Par défaut, Lambda invoque votre fonction dès que des enregistrements sont disponibles. Si le lot que Lambda lit à partir de la source d’événements ne comprend qu’un seul enregistrement, Lambda envoie un seul registre à la fonction. Pour éviter d’invoquer la fonction avec un petit nombre de registres, vous pouvez indiquer à la source d’événement de les mettre en mémoire tampon pendant 5 minutes en configurant une fenêtre de traitement par lots. Avant d’invoquer la fonction, Lambda continue de lire les registres de la source d’événements jusqu’à ce qu’il ait rassemblé un lot complet, que la fenêtre de traitement par lot expire ou que le lot atteigne la limite de charge utile de 6 Mo. Pour plus d’informations, consultez Comportement de traitement par lots.

Avertissement

Les mappages de sources d'événements Lambda traitent chaque événement au moins une fois, et le traitement des enregistrements peut être dupliqué. Pour éviter les problèmes potentiels liés à des événements dupliqués, nous vous recommandons vivement de rendre votre code de fonction idempotent. Pour en savoir plus, consultez Comment rendre ma fonction Lambda idempotente dans le Knowledge Center. AWS

Si votre fonction renvoie une erreur, Lambda réessaie de traiter le lot jusqu’à ce que le traitement réussisse ou que les données expirent. Pour éviter le blocage de partitions, vous pouvez configurer le mappage de source d’événement pour effectuer de nouvelles tentatives avec des lots de plus petite taille, limiter le nombre de tentatives ou ignorer les enregistrements trop anciens. Pour conserver les événements rejetés, vous pouvez configurer le mappage des sources d’événements afin d’envoyer les détails des lots ayant échoué à une file d’attente SQS standard ou à une rubrique SNS standard.

Pour augmenter la simultanéité, vous pouvez également traiter plusieurs lots en parallèle à partir de chaque partition. Lambda peut traiter simultanément jusqu’à 10 lots dans chaque partition. Si vous augmentez le nombre de lots simultanés par partition, Lambda garantit toujours le traitement dans l'ordre au niveau de l'élément (partition et clé de tri).

Configurez le ParallelizationFactorparamètre pour traiter une partition d'un flux de données Kinesis ou DynamoDB avec plusieurs invocations Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge à partir d’une partition via un facteur de parallélisation de 1 (par défaut) à 10. Par exemple, quand vous définissez ParallelizationFactor sur 2, vous pouvez avoir jusqu'à 200 invocations Lambda simultanés pour traiter 100 partitions de données Kinesis (bien que, dans la réalité, la métrique ConcurrentExecutions puisse indiquer une valeur différente). Cela permet d’augmenter le débit de traitement quand le volume de données est volatil et que la valeur du paramètre IteratorAge est élevée.

Vous pouvez également utiliser l'ParallelizationFactoragrégation avec Kinesis. Le comportement du mappage des sources d'événements varie selon que vous utilisez ou non un ventilateur amélioré :

  • Sans ventilation améliorée : tous les événements d'un événement agrégé doivent avoir la même clé de partition. La clé de partition doit également correspondre à celle de l'événement agrégé. Si les événements contenus dans l'événement agrégé ont des clés de partition différentes, Lambda ne peut garantir le traitement ordonné des événements par clé de partition.

  • Avec un fan-out amélioré : Lambda décode d'abord l'événement agrégé en événements individuels. L'événement agrégé peut avoir une clé de partition différente de celle des événements qu'il contient. Cependant, les événements qui ne correspondent pas à la clé de partition sont supprimés et perdus. Lambda ne traite pas ces événements et ne les envoie pas vers une destination de défaillance configurée.

Positions de départ des interrogations et des flux

Sachez que l’interrogation des flux lors des mises à jour et de la création du mappage des sources d’événements est finalement cohérente.

  • Lors de la création du mappage des sources d’événements, le démarrage de l’interrogation des événements depuis le flux peut prendre plusieurs minutes.

  • Lors des mises à jour du mappage des sources d’événements, l’arrêt et le redémarrage de l’interrogation des événements depuis le flux peuvent prendre plusieurs minutes.

Ce comportement signifie que si vous spécifiez LATEST comme position de départ du flux, le mappage des sources d’événements peut manquer des événements lors de la création ou des mises à jour. Pour vous assurer de ne manquer aucun événement, spécifiez la position de départ du flux comme TRIM_HORIZON.

Lecteurs simultanés d’une partition dans DynamoDB Streams

Pour les tables à région unique qui ne sont pas des tables globales, vous pouvez concevoir jusqu’à deux fonctions Lambda pour lire la même partition DynamoDB Streams simultanément. Le dépassement de cette limite peut se traduire par une limitation de la demande. Pour les tables globales, nous vous recommandons de limiter le nombre de fonctions simultanées à une seule pour éviter la limitation des demandes.

Autorisations du rôle d’exécution

La politique AWSLambdaDynamoDBExecutionRole AWS gérée inclut les autorisations dont Lambda a besoin pour lire dans votre flux DynamoDB. Ajoutez cette politique gérée au rôle d'exécution de votre fonction.

Pour envoyer des enregistrements de lots ayant échoué à une file d’attente SQS standard ou à une rubrique SNS standard, votre fonction a besoin d’autorisations supplémentaires. Chaque service de destination nécessite une autorisation différente, comme suit :

Ajouter des autorisations et créer le mappage des sources d'événements

Créez un mappage de source d’événement pour indiquer à Lambda d’envoyer des enregistrements de votre flux à une fonction Lambda. Vous pouvez créer plusieurs mappages de source d’événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou traiter des éléments de plusieurs flux avec une seule fonction.

Pour configurer votre fonction de manière à lire depuis DynamoDB Streams, associez la politique AWSLambdaDynamoDBExecutionRole AWS gérée à votre rôle d'exécution, puis créez un déclencheur DynamoDB.

Pour ajouter des autorisations et créer un déclencheur
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom d’une fonction.

  3. Choisissez l'onglet Configuration, puis Permissions (Autorisations).

  4. Sous Nom du rôle, choisissez le lien vers votre rôle d'exécution. Ce lien ouvre le rôle dans la console IAM.

    Lien vers le rôle d'exécution
  5. Choisissez Ajouter des autorisations, puis Attacher des politiques.

    Joindre des politiques dans la console IAM
  6. Dans le champ de recherche, entrez AWSLambdaDynamoDBExecutionRole. Ajoutez cette politique à votre rôle d'exécution. Il s'agit d'une politique AWS gérée qui contient les autorisations dont votre fonction a besoin pour lire le flux DynamoDB. Pour plus d'informations sur cette politique, consultez AWSLambdaDynamoDBExecutionRolela référence des politiques AWS gérées.

  7. Retournez à votre fonction dans la console Lambda. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).

    Section de présentation des fonctions de la console Lambda
  8. Choisissez un type de déclencheur.

  9. Configurez les options requises, puis choisissez Add (Ajouter).

Lambda prend en charge les options suivantes pour les sources d'événements DynamoDB :

Options de source d’événement
  • DynamoDB table (Table DynamoDB) – Table DynamoDB à partir de laquelle lire les enregistrements.

  • Batch size (Taille de lot) – Nombre d’enregistrements par lot à envoyer à la fonction, jusqu’à 10 000. Lambda transmet tous les enregistrements du lot à la fonction en une seule invocation, tant que la taille totale des événements ne dépasse pas la limite de charge utile pour une invocation synchrone (6 Mo).

  • Batch window (Fenêtre de traitement par lots) – Intervalle de temps maximum (en secondes) pour collecter des enregistrements avant d’invoquer la fonction.

  • Starting position (Position de départ) – Traiter uniquement les nouveaux enregistrements, ou tous enregistrement existants.

    • Latest (Derniers) – Traiter les nouveaux enregistrements ajoutés au flux.

    • Trim horizon (Supprimer l’horizon) – Traiter tous les enregistrements figurant dans le flux.

    Après le traitement de tous les enregistrements existants, la fonction est à jour et continue à traiter les nouveaux enregistrements.

  • Destination en cas d’échec : une file d’attente SQS standard ou rubrique SNS standard pour les enregistrements qui ne peuvent pas être traités. Quand Lambda écarte un lot d’enregistrements qui est trop ancien ou qui a épuisé toutes les tentatives, il envoie les détails du lot à la file d’attente ou à la rubrique.

  • Retry attempts (Nouvelles tentatives) – Nombre maximum de nouvelles tentatives que Lambda effectue quand la fonction renvoie une erreur. Cela ne s’applique pas aux erreurs ou limitations de service où le lot n’a pas atteint la fonction.

  • Maximum age of record (Âge maximum d’enregistrement) – Âge maximum d’un enregistrement que Lambda envoie à votre fonction.

  • Split batch on error (Fractionner le lot en cas d’erreur) – Quand la fonction renvoie une erreur, diviser le lot en deux avant d’effectuer une nouvelle tentative. Le paramètre de taille de lot d’origine reste inchangé.

  • Concurrent batches per shard (Lots simultanés par partition) – Traite simultanément plusieurs lots de la même partition.

  • Enabled (Activé) – Valeur définie sur VRAI pour activer le mappage de source d’événement. Définissez la valeur sur « false » pour arrêter le traitement des enregistrements. Lambda garde une trace du dernier enregistrement traité et reprend le traitement à ce point lorsque le mappage est réactivé.

Note

Les appels d' GetRecords API invoqués par Lambda dans le cadre des déclencheurs DynamoDB ne vous sont pas facturés.

Pour gérer ultérieurement la configuration de la source d’événement, choisissez le déclencheur dans le concepteur.

API de mappage de la source d’événement

Pour gérer une source d’événement à l’aide de la AWS Command Line Interface (AWS CLI) ou un AWS SDK, vous pouvez utiliser les opérations d’API suivantes :

L'exemple suivant utilise le AWS CLI pour mapper une fonction nommée my-function à un flux DynamoDB spécifié par son Amazon Resource Name (ARN), avec une taille de lot de 500.

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525

Vous devriez voir la sortie suivante :

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Configurez des options supplémentaires pour personnaliser la manière dont les lots sont traités et indiquer quand supprimer les enregistrements qui ne peuvent pas être traités. L’exemple suivant met à jour le mappage des sources d’événements afin d’envoyer un enregistrement d’échec à une file d’attente SQS standard après deux tentatives, ou si les enregistrements datent de plus d’une heure.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

Vous devriez voir la sortie suivante :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Les paramètres mis à jour sont appliqués de façon asynchrone et ne sont pas reflétés dans la sortie tant que le processus n’est pas terminé. Utilisez la commande get-event-source-mapping pour afficher l’état actuel.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

Vous devriez voir la sortie suivante :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2023-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Pour traiter plusieurs lots simultanément, utilisez l’option --parallelization-factor.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Gestion des erreurs

Le mappage de source d’événement qui lit les enregistrements de votre flux DynamoDB invoque votre fonction de manière synchrone et effectue de nouvelles tentatives sur les erreurs. Si Lambda limite la fonction ou renvoie une erreur sans invoquer la fonction, Lambda réessaie jusqu’à ce que les enregistrements expirent ou dépassent l’âge maximum que vous configurez pour le mappage de source d’événement.

Si la fonction reçoit les enregistrements mais renvoie une erreur, Lambda réessaie jusqu’à ce que les enregistrements du lot expirent, dépassent l’âge maximum défini ou atteignent le quota de nouvelles tentatives configuré. Pour les erreurs de fonction, vous pouvez également configurer le mappage de source d’événement de façon à diviser en deux lots un lot ayant échoué. Le fait de réessayer avec des lots de plus petite taille a pour effet d’isoler les enregistrements défectueux et de contourner les problèmes de délai d’expiration. Le fractionnement d’un lot n’est pas pris en compte dans le quota de nouvelles tentatives.

Si les mesures de gestion des erreurs échouent, Lambda ignore les enregistrements et poursuit le traitement des lots du flux. Avec les paramètres par défaut, cela signifie qu’un enregistrement défectueux peut bloquer le traitement sur la partition affectée pendant jusqu’à une journée. Pour éviter cela, configurez le mappage de source d’événement de votre fonction avec un nombre raisonnable de nouvelles tentatives et un âge maximum d’enregistrement correspondant à votre cas d’utilisation.

Pour conserver un enregistrement des lots écartés, configurez une destination d’événement ayant échoué. Lambda envoie un document à la file d’attente ou à la rubrique de destination avec des détails sur le lot.

Configurer une destination pour les registres d’événements ayant échoué
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d’ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Stream invocation (Invocation de flux).

  5. Pour Stream (Flux), choisissez un flux qui est mappé à la fonction.

  6. Pour Type de destination, choisissez le type de ressource qui reçoit l’enregistrement d’invocation.

  7. Pour Destination, choisissez une ressource.

  8. Choisissez Enregistrer.

L’exemple suivant illustre un enregistrement d’invocation pour un flux DynamoDB.

Exemple Enregistrement d’invocation
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Vous pouvez utiliser ces informations pour récupérer les enregistrements concernés à partir du flux à des fins de résolution de problèmes. Les enregistrements réels n’étant pas inclus, vous devez les récupérer du flux avant qu’ils expirent et soient perdus.

CloudWatch Métriques Amazon

Lambda émet la métrique IteratorAge lorsque votre fonction termine le traitement d’un lot d’enregistrements. Cette métrique indique l’âge du dernier enregistrement du lot à la fin du traitement. Si votre fonction traite de nouveaux événements, vous pouvez utiliser l’âge de l’itérateur pour estimer la latence entre le moment où un enregistrement est ajouté et celui où la fonction le traite.

Une tendance à la hausse de l’âge de l’itérateur peut indiquer des problèmes liés à votre fonction. Pour plus d’informations, consultez Utilisation des métriques de fonction Lambda.

Fenêtres horaires

Les fonctions Lambda peuvent exécuter des applications de traitement de flux continu. Un flux représente des données illimitées qui circulent en continu dans votre application. Pour analyser les informations provenant de cette entrée de mise à jour continue, vous pouvez lier les enregistrements inclus à l’aide d’une fenêtre de temps définie.

Les fenêtres bascules sont des fenêtres temporelles distinctes qui s’ouvrent et se ferment à intervalles réguliers. Par défaut, les invocations Lambda sont sans état : vous ne pouvez pas les utiliser pour traiter des données sur plusieurs invocations continues sans base de données externe. Cependant, avec les fenêtres bascules, vous pouvez maintenir votre état au long des invocations. Cet état contient le résultat global des messages précédemment traités pour la fenêtre actuelle. Votre état peut être d’un maximum de 1 Mo par partition. S’il dépasse cette taille, Lambda met fin précocement à la fenêtre de traitement.

Chaque enregistrement d’un flux appartient à une fenêtre spécifique. La fonction Lambda traitera chaque enregistrement au moins une fois. Toutefois, elle ne garantit pas un seul traitement pour chaque enregistrement. Dans de rares cas, tels que pour la gestion des erreurs, certains enregistrements peuvent être sujet à de multiples traitements. Les dossiers sont toujours traités dans l’ordre dès la première fois. Si les enregistrements sont traités plusieurs fois, ils peuvent être traités dans le désordre.

Regroupement et traitement

Votre fonction gérée par l’utilisateur est invoquée tant pour l’agrégation que pour le traitement des résultats finaux de celle-ci. Lambda regroupe tous les enregistrements reçus dans la fenêtre. Vous pouvez recevoir ces enregistrements en plusieurs lots, chacun sous forme d’invocation séparée. Chaque invocation reçoit un état. Ainsi, lorsque vous utilisez des fenêtres bascules, votre réponse de fonction Lambda doit contenir une propriété state. Si la réponse ne contient pas de propriété state, Lambda considère qu’il s’agit d’une invocation ayant échoué. Pour satisfaire à cette condition, votre fonction peut renvoyer un objet TimeWindowEventResponse ayant la forme JSON suivante :

Exemple TimeWindowEventResponse values
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Note

Pour les fonctions Java, nous vous recommandons d’utiliser Map<String, String> pour représenter l’état.

À la fin de la fenêtre, l’indicateur isFinalInvokeForWindow est défini sur true pour indiquer qu’il s’agit de l’état final et qu’il est prêt pour le traitement. Après le traitement, la fenêtre et votre invocation final se terminent, puis l’état est supprimé.

À la fin de votre fenêtre, Lambda applique un traitement final pour les actions sur les résultats de l’agrégation. Votre traitement final est invoqué de manière synchrone. Une fois l’invocation réussie, votre fonction contrôle le numéro de séquence et le traitement du flux continue. Si l’invocation échoue, votre fonction Lambda suspend le traitement ultérieur jusqu’à ce que l’invocation soit réussie.

Exemple DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuration

Vous pouvez configurer des fenêtres bascule lorsque vous créez ou mettez à jour un mappage de source d’événement. Pour configurer une fenêtre bascule, précisez la fenêtre en secondes. L'exemple de commande suivant AWS Command Line Interface (AWS CLI) crée un mappage des sources d'événements de streaming dont la fenêtre de basculement est de 120 secondes. La fonction Lambda définie pour l’agrégation et le traitement est nommée tumbling-window-example-function.

aws lambda create-event-source-mapping --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda détermine les limites des fenêtres bascule en fonction de l’heure à laquelle les enregistrements ont été insérés dans le flux. Tous les enregistrements ont un horodatage approximatif disponible que Lambda utilise pour déterminer des limites.

Les agrégations de fenêtres bascule ne prennent pas en charge le repartitionnement. Quand la partition prend fin, Lambda considère que la fenêtre de traitement est fermée, et les partitions enfants entament leur propre fenêtre de traitement dans un nouvel état.

Les fenêtres bascule prennent complètement en charge les stratégies de nouvelle tentative existantes maxRetryAttempts et maxRecordAge.

Exemple Handler.py – Agrégation et traitement

La fonction Python suivante montre comment regrouper et traiter votre état final :

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Signalement des échecs d’articles par lots

Lors de l’utilisation et du traitement de données de streaming à partir d’une source d’événement, par défaut, Lambda n’effectue un point de contrôle sur le numéro de séquence le plus élevé d’un lot que lorsque celui-ci est un succès complet. Lambda traite tous les autres résultats comme un échec complet et recommence à traiter le lot jusqu’à atteindre la limite de nouvelles tentatives. Pour autoriser des succès partiels lors du traitement des lots à partir d’un flux, activez ReportBatchItemFailures. Autoriser des succès partiels peut permettre de réduire le nombre de nouvelles tentatives sur un enregistrement, mais cela n’empêche pas entièrement la possibilité de nouvelles tentatives dans un enregistrement réussi.

Pour activer ReportBatchItemFailures, incluez la valeur enum ReportBatchItemFailures dans la listeFunctionResponseTypes. Cette liste indique quels types de réponse sont activés pour votre fonction. Vous pouvez configurer cette liste lorsque vous créez ou mettez à jour un mappage de source d’événement.

Syntaxe du rapport

Lors de la configuration des rapports d’échec d’articles de lot, la classe StreamsEventResponse est renvoyée avec une liste d’échecs d’articles de lot. Vous pouvez utiliser un objet StreamsEventResponse pour renvoyer le numéro de séquence du premier enregistrement ayant échoué dans le lot. Vous pouvez également créer votre classe personnalisée en utilisant la syntaxe de réponse correcte. La structure JSON suivante montre la syntaxe de réponse requise :

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
Note

Si le tableau batchItemFailures contient plusieurs éléments, Lambda utilise l’enregistrement portant le numéro de séquence le plus bas comme point de contrôle. Lambda réessaie ensuite tous les enregistrements à partir de ce point de contrôle.

Conditions de réussite et d’échec

Lambda traite un lot comme un succès complet si vous renvoyez l’un des éléments suivants :

  • Une liste batchItemFailure vide

  • Une liste batchItemFailure nulle

  • Une EventResponse vide

  • Un nu EventResponse

Lambda traite un lot comme un échec complet si vous renvoyez l’un des éléments suivants :

  • Une chaîne vid itemIdentifier

  • Un itemIdentifier nul

  • Un itemIdentifier avec un nom de clé incorrect

Lambda effectue des nouvelles tentatives en cas d’échec en fonction de votre stratégie de nouvelle tentative.

Diviser un lot

Si votre invocation échoue et que BisectBatchOnFunctionError est activé, le lot est divisé en deux quel que soit votre paramètre ReportBatchItemFailures.

Quand une réponse de succès partiel de lot est reçue et que les paramètres BisectBatchOnFunctionError et ReportBatchItemFailures sont activés, le lot est divisé au numéro de séquence renvoyé, et Lambda n’effectue de nouvelle tentative que sur les enregistrements restants.

Voici quelques exemples de codes de fonctions qui renvoient la liste des identifiants des messages ayant échoué dans le lot :

.NET
AWS SDK for .NET
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signalement des défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
Go
Kit SDK for Go V2
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
Java
SDK pour Java 2.x
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
JavaScript
SDK pour JavaScript (v3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de. JavaScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };

Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de. TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { DynamoDBBatchItemFailure, DynamoDBStreamEvent } from "aws-lambda"; export const handler = async (event: DynamoDBStreamEvent): Promise<DynamoDBBatchItemFailure[]> => { const batchItemsFailures: DynamoDBBatchItemFailure[] = [] let curRecordSequenceNumber for(const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber if(curRecordSequenceNumber) { batchItemsFailures.push({ itemIdentifier: curRecordSequenceNumber }) } } return batchItemsFailures }
Python
SDK pour Python (Boto3)
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler des défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
Kit SDK pour Ruby
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler des défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
Rust
SDK pour Rust
Note

Il y en a plus à ce sujet GitHub. Trouvez l’exemple complet et découvrez comment le configurer et l’exécuter dans le référentiel d’exemples sans serveur.

Signaler les défaillances d'éléments de lot DynamoDB avec Lambda à l'aide de Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Paramètres de configuration Amazon DynamoDB Streams

Tous les types de sources d'événements Lambda partagent les mêmes opérations CreateEventSourceMappinget les mêmes opérations d'UpdateEventSourceMappingAPI. Cependant, seuls certains paramètres s’appliquent à DynamoDB Streams.

Paramètres de la source d’événement qui s’appliquent à DynamoDB Streams
Paramètre Obligatoire Par défaut Remarques

BatchSize

N

100

Maximum : 10 000.

BisectBatchOnFunctionError

N

FAUX

DestinationConfig

N

File d’attente Amazon SQS standard ou destination de rubrique Amazon SNS standard pour les enregistrements ignorés

Activées

N

VRAI

EventSourceArn

Y

ARN du flux de données ou d’un consommateur de flux

FilterCriteria

N

Filtrage des événements Lambda

FunctionName

Y

FunctionResponseTypes

N

Pour permettre à votre fonction de signaler des échecs spécifiques dans un lot, incluez la valeur ReportBatchItemFailures dans FunctionResponseTypes. Pour plus d’informations, consultez Signalement des échecs d’articles par lots.

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 signifie infini : les enregistrements ayant échoué sont réessayés jusqu'à leur expiration. La limite de conservation des données pour DynamoDB Streams est de 24 heures.

Minimum : -1

Maximum : 604 800

MaximumRetryAttempts

N

-1

-1 signifie infini : les registres qui ont échoué sont réessayés jusqu’à ce que le registre expire.

Minimum : 0

Maximum : 10 000.

ParallelizationFactor

N

1

Maximum : 10

StartingPosition

O

TRIM_HORIZON ou LATEST

TumblingWindowInSeconds

N

Minimum : 0

Maximum : 900