Utilisation d'AWS Lambda avec Amazon DynamoDB - AWS Lambda

Utilisation d'AWS Lambda avec Amazon DynamoDB

Note

Si vous voulez envoyer des données à une cible autre qu'une fonction Lambda ou enrichir les données avant de les envoyer, consultez Amazon EventBridge Pipes.

Vous pouvez utiliser une fonction AWS Lambda pour traiter des enregistrements dans un flux Amazon DynamoDB. DynamoDB Streams vous permet de déclencher une fonction Lambda pour effectuer un travail supplémentaire chaque fois qu'une table DynamoDB est mise à jour.

Lambda lit les enregistrements du flux et appelle votre fonction de manière synchrone avec un événement contenant des enregistrements de flux. Lambda lit les enregistrements par lots et appelle votre fonction pour les traiter.

Exemple d'évènement

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-west-2:111122223333:table/TestTable/stream/2015-05-11T21:21:33.291", "eventSource": "aws:dynamodb" } ]}

Flux d'interrogation et de mise en lots

Lambda interroge les partitions de votre flux DynamoDB en quête d'enregistrements à une fréquence de base de quatre fois par seconde. Lorsque des enregistrements sont disponibles, Lambda appelle votre fonction et attend le résultat. Si le traitement réussit, Lambda reprend l'interrogation jusqu'à recevoir plus d'enregistrements.

Par défaut, Lambda appelle votre fonction dès que des enregistrements sont disponibles. Si le lot que Lambda lit à partir de la source d'événements ne comprend qu'un seul enregistrement, Lambda envoie un seul registre à la fonction. Pour éviter d'appeler la fonction avec un petit nombre de registres, vous pouvez indiquer à la source d'événement de les mettre en mémoire tampon pendant 5 minutes en configurant une fenêtre de traitement par lots. Avant d'invoquer la fonction, Lambda continue de lire les registres de la source d'événements jusqu'à ce qu'il ait rassemblé un lot complet, que la fenêtre de traitement par lot expire ou que le lot atteigne la limite de charge utile de 6 Mo. Pour de plus amples informations, veuillez consulter Comportement de traitement par lots.

Si votre fonction renvoie une erreur, Lambda réessaie de traiter le lot jusqu'à ce que le traitement réussisse ou que les données expirent. Pour éviter le blocage de partitions, vous pouvez configurer le mappage de source d'événement pour effectuer de nouvelles tentatives avec des lots de plus petite taille, limiter le nombre de tentatives ou ignorer les enregistrements trop anciens. Pour conserver les événements rejetés, vous pouvez configurer le mappage des sources d'événements afin d'envoyer les détails des lots ayant échoué à une file d'attente SQS standard ou à une rubrique SNS standard.

Vous pouvez également augmenter la simultanéité en traitant plusieurs lots de chaque partition en parallèle. Lambda peut traiter simultanément jusqu'à 10 lots dans chaque partition. Si vous augmentez le nombre de lots simultanés par partition, Lambda assure toujours un traitement dans l'ordre au niveau clé de partition.

Configurez le paramètre ParallelizationFactor pour traiter une partition d'un flux de données Kinesis ou DynamoDB avec plusieurs appels Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge à partir d'une partition via un facteur de parallélisation de 1 (par défaut) à 10. Par exemple, quand vous définissez ParallelizationFactor sur 2, vous pouvez avoir jusqu'à 200 appels Lambda simultanés pour traiter 100 partitions de données Kinesis. Cela permet d'augmenter le débit de traitement quand le volume de données est volatil et que la valeur du paramètre IteratorAge est élevée. Notez que le facteur de parallélisation ne fonctionnera pas si vous utilisez l'agrégation Kinesis. Pour plus d'informations, consultez la section Nouveaux contrôles de mise à l'échelle AWS Lambda pour les sources d'événements Kinesis et DynamoDB. Consultez également l'atelier Traitement des données sans serveur AWS pour obtenir des didacticiels complets.

Lecteurs simultanés d'une partition dans DynamoDB Streams

Pour les tables à région unique qui ne sont pas des tables globales, vous pouvez concevoir jusqu'à deux fonctions Lambda pour lire la même partition DynamoDB Streams simultanément. Le dépassement de cette limite peut se traduire par une limitation de la demande. Pour les tables globales, nous vous recommandons de limiter le nombre de fonctions simultanées à une seule pour éviter la limitation des demandes.

Autorisations du rôle d'exécution

Lambda a besoin des autorisations suivantes pour gérer les ressources liées à votre flux DynamoDB. Ajoutez-les au rôle d'exécution de la fonction.

La stratégie gérée AWSLambdaDynamoDBExecutionRole inclut ces autorisations. Pour de plus amples informations, veuillez consulter Rôle d'exécution Lambda.

Pour envoyer des enregistrements de lots ayant échoué à une file d'attente SQS standard ou à une rubrique SNS standard, votre fonction a besoin d'autorisations supplémentaires. Chaque service de destination nécessite une autorisation différente, comme suit :

Configuration d'un flux comme source d'événement

Créez un mappage de source d'événement pour indiquer à Lambda d'envoyer des enregistrements de votre flux à une fonction Lambda. Vous pouvez créer plusieurs mappages de source d'événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou traiter des éléments de plusieurs flux avec une seule fonction.

Pour configurer votre fonction afin qu'elle lise à partir de DynamoDB Streams dans la console Lambda, créez un déclencheur DynamoDB.

Pour créer un déclencheur
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez le nom d'une fonction.

  3. Sous Function overview (Présentation de la fonction), choisissez Add trigger (Ajouter un déclencheur).

  4. Choisissez un type de déclencheur.

  5. Configurez les options requises, puis choisissez Add (Ajouter).

Lambda prend en charge les options suivantes pour les sources d'événement DynamoDB.

Options de source d'événement
  • DynamoDB table (Table DynamoDB) – Table DynamoDB à partir de laquelle lire les enregistrements.

  • Batch size (Taille de lot) – Nombre d'enregistrements par lot à envoyer à la fonction, jusqu'à 10 000. Lambda transmet tous les enregistrements du lot à la fonction en un seul appel, tant que la taille totale des événements ne dépasse pas la limite de charge utile pour un appel synchrone (6 Mo).

  • Batch window (Fenêtre de traitement par lots) – Intervalle de temps maximum (en secondes) pour collecter des enregistrements avant d'appeler la fonction.

  • Starting position (Position de départ) – Traiter uniquement les nouveaux enregistrements, ou tous enregistrement existants.

    • Latest (Derniers) – Traiter les nouveaux enregistrements ajoutés au flux.

    • Trim horizon (Supprimer l'horizon) – Traiter tous les enregistrements figurant dans le flux.

    Après le traitement de tous les enregistrements existants, la fonction est à jour et continue à traiter les nouveaux enregistrements.

  • Destination en cas d'échec : une file d'attente SQS standard ou rubrique SNS standard pour les enregistrements qui ne peuvent pas être traités. Quand Lambda écarte un lot d'enregistrements qui est trop ancien ou qui a épuisé toutes les tentatives, il envoie les détails du lot à la file d'attente ou à la rubrique.

  • Retry attempts (Nouvelles tentatives) – Nombre maximum de nouvelles tentatives que Lambda effectue quand la fonction renvoie une erreur. Cela ne s'applique pas aux erreurs ou limitations de service où le lot n'a pas atteint la fonction.

  • Maximum age of record (Âge maximum d'enregistrement) – Âge maximum d'un enregistrement que Lambda envoie à votre fonction.

  • Split batch on error (Fractionner le lot en cas d'erreur) – Quand la fonction renvoie une erreur, diviser le lot en deux avant d'effectuer une nouvelle tentative. Le paramètre de taille de lot d'origine reste inchangé.

  • Concurrent batches per shard (Lots simultanés par partition) – Traite simultanément plusieurs lots de la même partition.

  • Enabled (Activé) – Valeur définie sur VRAI pour activer le mappage de source d'événement. Définissez la valeur sur « false » pour arrêter le traitement des enregistrements. Lambda garde une trace du dernier enregistrement traité et reprend le traitement à ce point lorsque le mappage est réactivé.

Note

Vous n'êtes pas facturé pour les appels d'API GetRecords effectués par Lambda en lien avec les déclencheurs DynamoDB.

Pour gérer ultérieurement la configuration de la source d'événement, choisissez le déclencheur dans le concepteur.

API de mappage de la source d'événement

Pour gérer une source d'événement à l'aide de la AWS Command Line Interface (AWS CLI) ou un AWS SDK, vous pouvez utiliser les opérations d'API suivantes :

L'exemple suivant utilise la AWS CLI pour mapper une fonction nommée my-function à un flux DynamoDB spécifiée par son Amazon Resource Name (ARN), avec une taille de lot de 500.

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --maximum-batching-window-in-seconds 5 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525

Vous devriez voir la sortie suivante :

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 5, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Configurez des options supplémentaires pour personnaliser la manière dont les lots sont traités et indiquer quand supprimer les enregistrements qui ne peuvent pas être traités. L'exemple suivant met à jour le mappage des sources d'événements afin d'envoyer un enregistrement d'échec à une file d'attente SQS standard après deux tentatives, ou si les enregistrements datent de plus d'une heure.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

Vous devriez voir la sortie suivante :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Les paramètres mis à jour sont appliqués de façon asynchrone et ne sont pas reflétés dans la sortie tant que le processus n'est pas terminé. Utilisez la commande get-event-source-mapping pour afficher l'état actuel.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

Vous devriez voir la sortie suivante :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Pour traiter plusieurs lots simultanément, utilisez l'option --parallelization-factor.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Gestion des erreurs

Le mappage de source d'événement qui lit les enregistrements de votre flux DynamoDB appelle votre fonction de manière synchrone et effectue de nouvelles tentatives sur les erreurs. Si Lambda limite la fonction ou renvoie une erreur sans appeler la fonction, Lambda réessaie jusqu'à ce que les enregistrements expirent ou dépassent l'âge maximum que vous configurez pour le mappage de source d'événement.

Si la fonction reçoit les enregistrements mais renvoie une erreur, Lambda réessaie jusqu'à ce que les enregistrements du lot expirent, dépassent l'âge maximum défini ou atteignent le quota de nouvelles tentatives configuré. Pour les erreurs de fonction, vous pouvez également configurer le mappage de source d'événement de façon à diviser en deux lots un lot ayant échoué. Le fait de réessayer avec des lots de plus petite taille a pour effet d'isoler les enregistrements défectueux et de contourner les problèmes de délai d'expiration. Le fractionnement d'un lot n'est pas pris en compte dans le quota de nouvelles tentatives.

Si les mesures de gestion des erreurs échouent, Lambda ignore les enregistrements et poursuit le traitement des lots du flux. Avec les paramètres par défaut, cela signifie qu'un enregistrement défectueux peut bloquer le traitement sur la partition affectée pendant jusqu'à une journée. Pour éviter cela, configurez le mappage de source d'événement de votre fonction avec un nombre raisonnable de nouvelles tentatives et un âge maximum d'enregistrement correspondant à votre cas d'utilisation.

Pour conserver un enregistrement des lots écartés, configurez une destination d'événement ayant échoué. Lambda envoie un document à la file d'attente ou à la rubrique de destination avec des détails sur le lot.

Configurer une destination pour les registres d'événements ayant échoué
  1. Ouvrez la page Functions (Fonctions) de la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d'ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Stream invocation (Appel de flux).

  5. Pour Stream (Flux), choisissez un flux qui est mappé à la fonction.

  6. Pour Type de destination, choisissez le type de ressource qui reçoit l'enregistrement d'appel.

  7. Pour Destination, choisissez une ressource.

  8. Choisissez Enregistrer.

L'exemple suivant illustre un enregistrement d'appel pour un flux DynamoDB.

Exemple Enregistrement d'appel
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Vous pouvez utiliser ces informations pour récupérer les enregistrements concernés à partir du flux à des fins de résolution de problèmes. Les enregistrements réels n'étant pas inclus, vous devez les récupérer du flux avant qu'ils expirent et soient perdus.

Métriques Amazon CloudWatch

Lambda émet la métrique IteratorAge lorsque votre fonction termine le traitement d'un lot d'enregistrements. Cette métrique indique l'âge du dernier enregistrement du lot à la fin du traitement. Si votre fonction traite de nouveaux événements, vous pouvez utiliser l'âge de l'itérateur pour estimer la latence entre le moment où un enregistrement est ajouté et celui où la fonction le traite.

Une tendance à la hausse de l'âge de l'itérateur peut indiquer des problèmes liés à votre fonction. Pour de plus amples informations, veuillez consulter Utilisation des métriques de fonction Lambda.

Fenêtres horaires

Les fonctions Lambda peuvent exécuter des applications de traitement de flux continu. Un flux représente des données illimitées qui circulent en continu dans votre application. Pour analyser les informations provenant de cette entrée de mise à jour continue, vous pouvez lier les enregistrements inclus à l'aide d'une fenêtre de temps définie.

Les fenêtres bascules sont des fenêtres temporelles distinctes qui s'ouvrent et se ferment à intervalles réguliers. Par défaut, les appels Lambda sont sans état : vous ne pouvez pas les utiliser pour traiter des données sur plusieurs appels continus sans base de données externe. Cependant, avec les fenêtres bascules, vous pouvez maintenir votre état au long des appels. Cet état contient le résultat global des messages précédemment traités pour la fenêtre actuelle. Votre état peut être d'un maximum de 1 Mo par partition. S'il dépasse cette taille, Lambda met fin précocement à la fenêtre de traitement.

Chaque enregistrement d'un flux appartient à une fenêtre spécifique. La fonction Lambda traitera chaque enregistrement au moins une fois. Toutefois, elle ne garantit pas un seul traitement pour chaque enregistrement. Dans de rares cas, tels que pour la gestion des erreurs, certains enregistrements peuvent être sujet à de multiples traitements. Les dossiers sont toujours traités dans l'ordre dès la première fois. Si les enregistrements sont traités plusieurs fois, ils peuvent être traités dans le désordre.

Regroupement et traitement

Votre fonction gérée par l'utilisateur est appelée tant pour l'agrégation que pour le traitement des résultats finaux de celle-ci. Lambda regroupe tous les enregistrements reçus dans la fenêtre. Vous pouvez recevoir ces enregistrements en plusieurs lots, chacun sous forme d'appel séparé. Chaque appel reçoit un état. Ainsi, lorsque vous utilisez des fenêtres bascules, votre réponse de fonction Lambda doit contenir une propriété state. Si la réponse ne contient pas de propriété state, Lambda considère qu'il s'agit d'un appel ayant échoué. Pour satisfaire à cette condition, votre fonction peut renvoyer un objet TimeWindowEventResponse ayant la forme JSON suivante :

Exemple TimeWindowEventResponse values
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Note

Pour les fonctions Java, nous vous recommandons d'utiliser Map<String, String> pour représenter l'état.

À la fin de la fenêtre, l'indicateur isFinalInvokeForWindow est défini sur true pour indiquer qu'il s'agit de l'état final et qu'il est prêt pour le traitement. Après le traitement, la fenêtre et votre appel final se terminent, puis l'état est supprimé.

À la fin de votre fenêtre, Lambda applique un traitement final pour les actions sur les résultats de l'agrégation. Votre traitement final est appelé de manière synchrone. Une fois l'appel réussi, votre fonction contrôle le numéro de séquence et le traitement du flux continue. Si l'appel échoue, votre fonction Lambda suspend le traitement ultérieur jusqu'à ce que l'appel soit réussi.

Exemple DynamoDbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuration

Vous pouvez configurer des fenêtres bascule lorsque vous créez ou mettez à jour un mappage de source d'événement. Pour configurer une fenêtre bascule, précisez la fenêtre en secondes. L'exemple de commande AWS Command Line Interface (AWS CLI) suivant crée un mappage de source d'événement de streaming dont la fenêtre bascule est de 120 secondes. La fonction Lambda définie pour l'agrégation et le traitement est nommée tumbling-window-example-function.

aws lambda create-event-source-mapping --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda détermine les limites des fenêtres bascule en fonction de l'heure à laquelle les enregistrements ont été insérés dans le flux. Tous les enregistrements ont un horodatage approximatif disponible que Lambda utilise pour déterminer des limites.

Les agrégations de fenêtres bascule ne prennent pas en charge le repartitionnement. Quand la partition prend fin, Lambda considère que la fenêtre de traitement est fermée, et les partitions enfants entament leur propre fenêtre de traitement dans un nouvel état.

Les fenêtres bascule prennent complètement en charge les stratégies de nouvelle tentative existantes maxRetryAttempts et maxRecordAge.

Exemple Handler.py – Agrégation et traitement

La fonction Python suivante montre comment regrouper et traiter votre état final :

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Signalement des échecs d'articles par lots

Lors de l'utilisation et du traitement de données de streaming à partir d'une source d'événement, par défaut, Lambda n'effectue un point de contrôle sur le numéro de séquence le plus élevé d'un lot que lorsque celui-ci est un succès complet. Lambda traite tous les autres résultats comme un échec complet et recommence à traiter le lot jusqu'à atteindre la limite de nouvelles tentatives. Pour autoriser des succès partiels lors du traitement des lots à partir d'un flux, activez ReportBatchItemFailures. Autoriser des succès partiels peut permettre de réduire le nombre de nouvelles tentatives sur un enregistrement, mais cela n'empêche pas entièrement la possibilité de nouvelles tentatives dans un enregistrement réussi.

Pour activer ReportBatchItemFailures, incluez la valeur enum ReportBatchItemFailures dans la listeFunctionResponseTypes. Cette liste indique quels types de réponse sont activés pour votre fonction. Vous pouvez configurer cette liste lorsque vous créez ou mettez à jour un mappage de source d'événement.

Syntaxe du rapport

Lors de la configuration des rapports d'échec d'articles de lot, la classe StreamsEventResponse est renvoyée avec une liste d'échecs d'articles de lot. Vous pouvez utiliser un objet StreamsEventResponse pour renvoyer le numéro de séquence du premier enregistrement ayant échoué dans le lot. Vous pouvez également créer votre classe personnalisée en utilisant la syntaxe de réponse correcte. La structure JSON suivante montre la syntaxe de réponse requise :

{ "batchItemFailures": [ { "itemIdentifier": "<id>" } ] }
Note

Si le tableau batchItemFailures contient plusieurs éléments, Lambda utilise l'enregistrement portant le numéro de séquence le plus bas comme point de contrôle. Lambda réessaie ensuite tous les enregistrements à partir de ce point de contrôle.

Conditions de réussite et d'échec

Lambda traite un lot comme un succès complet si vous renvoyez l'un des éléments suivants :

  • Une liste batchItemFailure vide

  • Une liste batchItemFailure nulle

  • Un vid EventResponse

  • Un nu EventResponse

Lambda traite un lot comme un échec complet si vous renvoyez l'un des éléments suivants :

  • Une chaîne vid itemIdentifier

  • Un nu itemIdentifier

  • Un itemIdentifier avec un nom de clé incorrect

Lambda effectue des nouvelles tentatives en cas d'échec en fonction de votre stratégie de nouvelle tentative.

Diviser un lot

Si votre appel échoue et que BisectBatchOnFunctionError est activé, le lot est divisé en deux quel que soit votre paramètre ReportBatchItemFailures.

Quand une réponse de succès partiel de lot est reçue et que les paramètres BisectBatchOnFunctionError et ReportBatchItemFailures sont activés, le lot est divisé au numéro de séquence renvoyé, et Lambda n'effectue de nouvelle tentative que sur les enregistrements restants.

Java
Exemple Handler.java – return new StreamsEventResponse()
import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
Python
Exemple Handler.py – return batchItemFailures[]
def handler(event, context): records = event.get("Records") curRecordSequenceNumber = ""; for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

Paramètres de configuration Amazon DynamoDB Streams

Tous les types de sources d'événement Lambda partagent les mêmes opérations d'API CreateEventSourceMapping et UpdateEventSourceMapping. Cependant, seuls certains paramètres s'appliquent à DynamoDB Streams

Paramètres de la source d'événement qui s'appliquent à DynamoDB Streams
Paramètre Obligatoire Par défaut Remarques

BatchSize

N

100

Maximum : 10 000.

BisectBatchOnFunctionError

N

FAUX

DestinationConfig

N

File d'attente Amazon SQS standard ou destination de rubrique Amazon SNS standard pour les enregistrements ignorés

Activé

N

VRAI

EventSourceArn

Y

ARN du flux de données ou d'un consommateur de flux

FilterCriteria

N

FunctionName

Y

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 signifie infini : les registres qui ont échoué sont réessayés jusqu'à ce que le registre expire.

Minimum : -1

Maximum : 604 800

MaximumRetryAttempts

N

-1

-1 signifie infini : les registres qui ont échoué sont réessayés jusqu'à ce que le registre expire.

Minimum : -1

Maximum : 604 800

ParallelizationFactor

N

1

Maximum : 10

StartingPosition

O

TRIM_HORIZON ou LATEST

TumblingWindowInSeconds

N

0Minimum : 7

Maximum : 900