Utilisation de AWS Lambda avec Amazon DynamoDB - AWS Lambda

Utilisation de AWS Lambda avec Amazon DynamoDB

Vous pouvez utiliser une fonction AWS Lambda pour traiter des enregistrements dans un flux Amazon DynamoDB. Avec Flux DynamoDB, vous pouvez déclencher une fonction Lambda pour effectuer des tâches supplémentaires chaque fois qu'une table DynamoDB est mise à jour.

Lambda lit les enregistrements à partir du flux et appelle votre fonction de façon synchrone avec un événement qui contient des enregistrements de flux. Lambda lit les enregistrements sous forme de lots et appelle votre fonction pour traiter les enregistrements d'un lot.

Exemple Événement d'enregistrement Flux DynamoDB

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }

Lambda interroge les partitions de votre flux DynamoDB pour obtenir des enregistrements à une fréquence de base de quatre fois par seconde. Lorsque des enregistrements sont disponibles, Lambda appelle votre fonction et attend le résultat. Si le traitement réussit, Lambda reprend l'interrogation jusqu'à ce qu'il reçoive plus d'enregistrements.

Par défaut, Lambda appelle votre fonction dès que des enregistrements sont disponibles dans le flux. Si le lot lu par Lambda dans le flux ne comporte qu'un enregistrement, Lambda n'envoie qu'un enregistrement à la fonction. Pour éviter d'appeler la fonction avec un petit nombre d'enregistrements, vous pouvez demander à la source d'événement de les mettre en mémoire tampon pendant cinq minutes maximum en configurant une fenêtre de lot. Avant d'appeler la fonction, Lambda continue de lire les enregistrements du flux jusqu'à avoir collecté un lot complet ou jusqu'à l'expiration de la fenêtre de lot.

Si votre fonction renvoie une erreur, Lambda retente de traiter le lot jusqu'à ce que le traitement réussisse ou que les données expirent. Pour éviter les partitions bloquées, vous pouvez configurer le mappage de source d'événement pour réessayer avec une taille de lot plus petite, limiter le nombre de nouvelles tentatives ou supprimer les enregistrements qui sont trop vieux. Afin de conserver les événements supprimés, vous pouvez configurer le mappage de source d'événement pour envoyer les informations détaillées sur les lots ayant échoué à une file d'attente SQS ou à une rubrique SNS.

Vous pouvez également augmenter la simultanéité en traitant plusieurs lots à partir de chaque partition en parallèle. Lambda peut traiter jusqu'à 10 lots dans chaque partition simultanément. Si vous augmentez le nombre de lots simuItanés par partition, Lambda garantit toujours le traitement dans l'ordre au niveau de la clé de partition.

Configurez le paramètre ParallelizationFactor pour traiter une partition d'un flux de données Kinesis ou DynamoDB avec plusieurs appels Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge dans une partition via un facteur de parallélisation compris entre 1 (valeur par défaut) et 10. Par exemple, lorsque la valeur ParallelizationFactor est définie sur 2, vous pouvez avoir 200 appels Lambda simultanés au maximum pour traiter 100 fragments de données Kinesis. Cela permet d’augmenter le débit de traitement lorsque le volume de données est volatile et que IteratorAge est élevé. Pour de plus amples informations, veuillez consulter les informations relatives aux nouveaux contrôles de mise à l'échelle AWS Lambda pour les sources d'événements Kinesis et DynamoDB.

Autorisations du rôle d'exécution

Lambda a besoin des autorisations suivantes pour gérer les ressources liées à votre flux DynamoDB. Ajoutez-les au rôle d'exécution de votre fonction.

La stratégie gérée AWSLambdaDynamoDBExecutionRole inclut ces autorisations. Pour plus d'informations, consultez la section Rôle d'exécution AWS Lambda.

Pour envoyer les enregistrements des lots ayant échoué à une file d'attente ou à une rubrique, votre fonction a besoin d'autorisations supplémentaires. Chaque service de destination nécessite une autorisation différente, comme suit :

Configuration d'un flux comme source d'événement

Créez un mappage de source d'événement pour indiquer à Lambda d'envoyer des enregistrements à partir de votre flux à une fonction Lambda. Vous pouvez créer plusieurs mappages de source d'événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou pour traiter des éléments à partir de plusieurs flux avec une seule fonction.

Pour configurer votre fonction afin de lire à partir de Flux DynamoDB dans la console Lambda, créez un déclencheur DynamoDB.

Pour créer un déclencheur

  1. Ouvrez la page des fonctions sur la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d'ensemble de la fonction), choisissez Add trigger (Ajouter un déclencheur).

  4. Choisissez un type de déclencheur.

  5. Configurez les options requises, puis choisissez Ajouter.

Lambda prend en charge les options suivantes pour les sources d'événements DynamoDB.

Options de source d'événement

  • Table DynamoDB– La table DynamoDB à partir de laquelle lire les enregistrements.

  • Taille de lot –: nombre d'enregistrements à envoyer à la fonction dans chaque lot, jusqu'à 10 000. Lambda transmet tous les enregistrements du lot à la fonction en un seul appel, tant que la taille totale des événements ne dépasse pas la limite de charge utile pour un appel synchrone (6 MB).

  • Batch window (Fenêtre de lot) – Spécifiez l'intervalle de temps maximal pour collecter des enregistrements avant d'appeler la fonction, en secondes.

  • Starting position (Position de départ) –: traitez uniquement les nouveaux enregistrements, ou tous enregistrement existants.

    • Dernier –: traitez les nouveaux enregistrements ajoutés au flux.

    • Trim horizon (Supprimer l'horizon) –: traitez tous les enregistrements du flux.

    Après le traitement de tous les enregistrements existants, la fonction est à jour et continue à traiter les nouveaux enregistrements.

  • On-failure destination (Destination en cas d'échec) – File d'attente SQS ou rubrique SNS pour les enregistrements qui ne peuvent pas être traités. Lorsque Lambda supprime un lot d'enregistrements parce qu'il est trop vieux ou parce que toutes les nouvelles tentatives ont été épuisées, il envoie les détails sur ce lot à la fille d'attente ou à la rubrique.

  • Retry attempts (Nouvelles tentatives) – Nombre maximal de nouvelles tentatives effectuées par Lambda lorsqu'une fonction renvoie une erreur. Cela ne s'applique pas aux limitations ou erreurs de service où le lot n'a pas atteint la fonction.

  • Maximum age of record (Âge maximal de l'enregistrement) – Âge maximal d'un enregistrement que Lambda envoie à votre fonction.

  • Split batch on error (Fractionner le lot en cas d'erreur) – Lorsque la fonction renvoie une erreur, fractionnez le lot en deux avant de réessayer.

  • Concurrent batches per shard (Lots simultanés par partition) – Traitez plusieurs lots simultanément à partir de la même partition.

  • Enabled (Activé) : définissez la valeur sur « true » pour activer le mappage de la source d'événement. Définissez la valeur sur « false » pour arrêter le traitement des enregistrements. Lambda assure le suivi du dernier enregistrement traité et reprend le traitement à ce point lorsque le mappage est réactivé.

Note

Vous n'êtes pas facturé pour les appels d'API GetRecords invoqués par Lambda dans le cadre des déclencheurs DynamoDB.

Pour gérer ultérieurement la configuration de la source d'événement, choisissez le déclencheur dans le concepteur.

API de mappage de la source d'événement

Pour gérer une source d'événements avec le AWS CLI ou le kit AWS SDK, vous pouvez utiliser les opérations d'API suivantes :

L'exemple suivant utilise l'AWS CLI pour mapper une fonction nommée my-function à un flux DynamoDB spécifié par son Amazon Resource Name (ARN), avec une taille de lot égale à 500.

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525

Vous devriez voir la sortie suivante:

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Configurez des options supplémentaires pour personnaliser la manière dont les lots sont traités et pour indiquer quand supprimer les enregistrements qui ne peuvent pas être traités. L'exemple suivant met à jour un mappage de source d'événement pour envoyer un enregistrement d'échec à une file d'attente SQS après deux nouvelles tentatives ou si les enregistrements datent de plus d'une heure.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

Vous devriez voir cette sortie :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Les paramètres mis à jour sont appliqués de façon asynchrone et ne sont pas reflétés dans la sortie tant que le processus n'est pas terminé. Utilisez la commande get-event-source-mapping pour afficher le statut actuel.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

Vous devriez voir cette sortie :

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Pour traiter plusieurs lots simultanément, utilisez l'option --parallelization-factor.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Gestion des erreurs

Le mappage de source d'événement qui lit les enregistrements à partir de votre flux DynamoDB appelle votre fonction de façon synchrone et réessaie en cas d'erreur. Si la fonction est limitée ou si le service Lambda renvoie une erreur sans appeler la fonction, Lambda réessaie jusqu'à ce que les enregistrements expirent ou dépassent l'âge maximal que vous configurez dans le mappage de source d'événement.

Si la fonction reçoit les enregistrements mais renvoie une erreur, Lambda réessaye jusqu'à ce que les enregistrements du lot expirent, dépassent l'âge maximal ou atteignent le quota de nouvelles tentatives configurée. Pour les erreurs de fonction, vous pouvez également configurer le mappage de source d'événement pour fractionner un lot ayant échoué en deux lots. Le fait de réessayer avec des lots plus petits permet d'isoler les enregistrements incorrects et de contourner les problèmes de dépassement de délai. Le fractionnement d'un lot n'est pas comptabilisé dans le quota de nouvelles tentatives.

Si les mesures de gestion des erreurs échouent, Lambda supprime les enregistrements et continue de traiter les lots à partir du flux. Avec les paramètres par défaut, cela signifie qu'un enregistrement incorrect peut bloquer le traitement sur la partition concernée pendant jusqu'à one day. Pour éviter cela, configurez le mappage de source d'événement de votre fonction avec un nombre raisonnable de nouvelles tentatives et un âge maximal des enregistrements adapté à votre cas d'utilisation.

Pour conserver un enregistrement des lots supprimés, configurez une destination en cas d'échec. Lambda envoie un document à la rubrique ou à la file d'attente de cette destination avec des détails sur le lot.

Pour configurer une destination pour enregistrer les événements ayant échoué.

  1. Ouvrez la page des fonctions sur la console Lambda.

  2. Choisissez une fonction.

  3. Sous Function overview (Vue d'ensemble de la fonction), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Stream invocation (Appel de flux).

  5. Pour Stream (Flux), choisissez un flux qui est mappé à la fonction.

  6. Pour Type de destination, choisissez le type de ressource qui reçoit l'enregistrement d'appel.

  7. Pour Destination, choisissez une ressource.

  8. Choisissez Enregistrer.

L'exemple suivant illustre un enregistrement d'appel pour un flux DynamoDB.

Exemple Enregistrement d'appel

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Vous pouvez utiliser ces informations pour récupérer les enregistrements concernés à partir du flux afin de résoudre les problèmes. Les enregistrements réels ne sont pas inclus. Vous devez donc traiter cet enregistrement et les récupérer à partir du flux avant qu'ils expirent et soient perdus.

Métriques Amazon CloudWatch

Lambda émet la métrique IteratorAge lorsque votre fonction termine le traitement d'un lot d'enregistrements. Cette métrique indique l'âge du dernier enregistrement du lot à la fin du traitement. Si votre fonction traite de nouveaux événements, vous pouvez utiliser l'âge de l'itérateur pour estimer la latence entre le moment où un enregistrement est ajouté et celui où la fonction le traite.

Une tendance à la hausse de l'âge de l'itérateur peut indiquer des problèmes liés à votre fonction. Pour de plus amples informations, veuillez consulter Utilisation des métriques de fonction AWS Lambda.

Fenêtres horaires

Les fonctions Lambda peuvent exécuter des applications de traitement de flux continu. Un flux représente des données illimitées qui circulent en continu dans votre application. Pour analyser les informations provenant de cette entrée de mise à jour continue, vous pouvez lier les enregistrements inclus à l'aide d'une fenêtre de temps définie.

Les invocations Lambda sont sans état : vous ne pouvez pas les utiliser pour traiter des données sur plusieurs appels continus sans base de données externe. Toutefois, lorsque le fenêtrage est activé, vous pouvez conserver votre état à travers les invocations. Cet état contient le résultat global des messages précédemment traités pour la fenêtre actuelle. Votre état peut être d'un maximum de 1 Mo par partition. S'il dépasse cette taille, Lambda met fin à la fenêtre plus tôt.

Fenêtres bascules

Les fonctions Lambda peuvent regrouper des données à l'aide de fenêtres bascules : des fenêtres horaires distinctes qui s'ouvrent et se ferment à intervalles réguliers. Les fenêtres bascules vous permettent de traiter des sources de données en continu grâce à des fenêtres horaires rapprochées et non superposées.

Chaque enregistrement d'un flux appartient à une fenêtre spécifique. Un enregistrement n'est traité qu'une seule fois, lorsque Lambda traite la fenêtre à laquelle il appartient. Dans chaque fenêtre, vous pouvez effectuer des calculs, tels qu'une somme ou une moyenne, au niveau de la clé de partition dans une partition.

Regroupement et traitement

Votre fonction gérée par l'utilisateur est appelée à la fois pour le regroupement et pour traiter les résultats finaux de ce regroupement. Lambda regroupe tous les enregistrements reçus dans la fenêtre. Vous pouvez recevoir ces enregistrements en plusieurs lots, chacun sous forme d'appel séparé. Chaque appel reçoit un état. Vous pouvez également traiter les enregistrements et renvoyer un nouvel état, qui est transmis lors du prochain appel. Lambda renvoie un TimeWindowEventResponse en JSON dans le format suivant :

Exemple TimeWindowEventReponse values

{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Note

Pour les fonctions Java, nous vous recommandons une utilisation Map<String, String> pour représenter l'état.

À la fin de la fenêtre, l'indicateur isFinalInvokeForWindow est défini sur true pour indiquer qu'il s'agit de l'état final et qu'il est prêt pour le traitement. Après le traitement, la fenêtre et votre appel final se terminent, puis l'état est supprimé.

À la fin de votre fenêtre, Lambda utilise le traitement final pour les actions sur les résultats de regroupement. Votre traitement final est appelé de manière synchrone. Une fois l'appel réussi, votre fonction contrôle le numéro de séquence et le traitement du flux continue. Si l'appel échoue, votre fonction Lambda suspend le traitement ultérieur jusqu'à ce que l'appel soit réussi.

Exemple DynamodbTimeWindowEvent

{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuration

Vous pouvez configurer des fenêtres bascule lorsque vous créez ou mettez à jour un mappage de source d'événement. Pour configurer une fenêtre bascule, précisez la fenêtre en secondes. L'exemple de commande AWS Command Line Interface (AWS CLI) suivant crée un mappage de source d'événement en continu dont la fenêtre bascule est de 120 secondes. La fonction Lambda définie pour le regroupement et le traitement est nommée tumbling-window-example-function.

aws lambda create-event-source-mapping --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda détermine les limites des fenêtres bascule en fonction de l'heure à laquelle les enregistrements ont été insérés dans le flux. Tous les enregistrements ont un horodatage approximatif disponible que Lambda utilise dans les déterminations de limite.

Les agrégations de fenêtres bascule ne prennent pas en charge le repartitionnement. Lorsque la partition se termine, Lambda prend en compte la fenêtre fermée, et les partitions enfant lancent leur propre fenêtre dans un nouvel état.

Les fenêtres bascule prennent complètement en charge les stratégies de nouvelle tentative existantes maxRetryAttempts et maxRecordAge.

Exemple Handler.py – Regroupement et traitement

La fonction Python suivante montre comment regrouper et traiter votre état final :

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Signalement des échecs d'articles par lots

Lors de l’utilisation et du traitement de données en continu à partir d'une source d'événement, Lambda effectue par défaut des points de contrôle vers le numéro de séquence le plus élevé d'un lot uniquement lorsque le lot est un succès complet. Lambda traite tous les autres résultats comme des échecs complets et tente à nouveau de traiter le lot jusqu'à la limite de nouvelle tentative. Pour autoriser des succès partiels lors du traitement des lots à partir d'un flux, activez ReportBatchItemFailures. Autoriser des succès partiels peut permettre de réduire le nombre de nouvelles tentatives sur un enregistrement, mais cela n'empêche pas entièrement la possibilité de nouvelles tentatives dans un enregistrement réussi.

Pour activer ReportBatchItemFailures, incluez la valeur enum ReportBatchItemFailures dans la listeFunctionResponseTypes. Cette liste indique quels types de réponse sont activés pour votre fonction. Vous pouvez configurer cette liste lorsque vous créez ou mettez à jour un mappage de source d'événement.

Syntaxe du rapport

Lors de la configuration des rapports d’échec d'articles de lot, la classe StreamsEventResponse est renvoyée avec une liste d'échecs d'articles de lot. Vous pouvez utiliser un objet StreamsEventResponse pour renvoyer le numéro de séquence du premier enregistrement ayant échoué dans le lot. Vous pouvez également créer votre classe personnalisée en utilisant la syntaxe de réponse correcte. La structure JSON suivante montre la syntaxe de réponse requise :

{ "batchItemFailures": [ { "itemIdentifier": "<id>" } ] }

Conditions de réussite et d'échec

Lambda traite un lot comme un succès complet si vous renvoyez l'un des éléments suivants :

  • Une liste batchItemFailure vide

  • Une liste batchItemFailure nulle

  • Un EventResponse vide

  • Un EventResponse nul

Lambda traite un lot comme un échec complet si vous renvoyez l'un des éléments suivants :

  • Une chaîne itemIdentifier vide

  • Un itemIdentifier nul

  • Un itemIdentifier avec un nom de clé incorrect

Lambda retente les échecs en fonction de votre stratégie de nouvelle tentative.

Diviser un lot

Si votre appel échoue et que BisectBatchOnFunctionError est activé, le lot est divisé en deux quel que soit votre paramètre ReportBatchItemFailures.

Lorsqu'une réponse de succès partiel de lot est reçue et que BisectBatchOnFunctionError et ReportBatchItemFailures sont activées, le lot est divisé au numéro de séquence renvoyé et Lambda n’effectue de nouvelle tentative que sur les enregistrements restants.

Java

Exemple Handler.java – return new StreamEventResponse ()

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public Serializable handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<*>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbEventRecord dynamodbEventRecord : input.getRecords()) { try { //Process your record DynamodbEvent.Record dynamodbRecord = dynamodbEventRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { //Return failed record's sequence number batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
Python

Exemple Handler.py – return batchItemFailures []

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = ""; for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}