Utilisation de AWS Lambda avec Amazon DynamoDB - AWS Lambda

Utilisation de AWS Lambda avec Amazon DynamoDB

Vous pouvez utiliser une fonction AWS Lambda pour traiter des enregistrements dans un flux Amazon DynamoDB. Avec Flux DynamoDB, vous pouvez déclencher une fonction Lambda pour effectuer des tâches supplémentaires chaque fois qu'une table DynamoDB est mise à jour.

Lambda lit les enregistrements à partir du flux et appelle votre fonction de façon synchrone avec un événement qui contient des enregistrements de flux. Lambda lit les enregistrements sous forme de lots et appelle votre fonction pour traiter les enregistrements d'un lot.

Exemple Événement d'enregistrement Flux DynamoDB

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }

Lambda interroge les partitions de votre flux DynamoDB pour obtenir des enregistrements à une fréquence de base de quatre fois par seconde. Lorsque des enregistrements sont disponibles, Lambda appelle votre fonction et attend le résultat. Si le traitement réussit, Lambda reprend l'interrogation jusqu'à ce qu'il reçoive plus d'enregistrements.

Par défaut, Lambda appelle votre fonction dès que des enregistrements sont disponibles dans le flux. Si le lot lu par Lambda dans le flux ne comporte qu'un enregistrement, Lambda n'envoie qu'un enregistrement à la fonction. Pour éviter d'appeler la fonction avec un petit nombre d'enregistrements, vous pouvez demander à la source d'événement de les mettre en mémoire tampon pendant cinq minutes maximum en configurant une fenêtre de lot. Avant d'appeler la fonction, Lambda continue de lire les enregistrements du flux jusqu'à avoir collecté un lot complet ou jusqu'à l'expiration de la fenêtre de lot.

Si votre fonction renvoie une erreur, Lambda retente de traiter le lot jusqu'à ce que le traitement réussisse ou que les données expirent. Pour éviter les partitions bloquées, vous pouvez configurer le mappage de source d'événement pour réessayer avec une taille de lot plus petite, limiter le nombre de nouvelles tentatives ou supprimer les enregistrements qui sont trop vieux. Afin de conserver les événements supprimés, vous pouvez configurer le mappage de source d'événement pour envoyer les informations détaillées sur les lots ayant échoué à une file d'attente SQS ou à une rubrique SNS.

Vous pouvez également augmenter la simultanéité en traitant plusieurs lots à partir de chaque partition en parallèle. Lambda peut traiter jusqu'à 10 lots dans chaque partition simultanément. Si vous augmentez le nombre de lots simuItanés par partition, Lambda garantit toujours le traitement dans l'ordre au niveau de la clé de partition.

Configurez le paramètre ParallelizationFactor pour traiter une partition d'un flux de données Kinesis ou DynamoDB avec plusieurs appels Lambda simultanément. Vous pouvez spécifier le nombre de lots simultanés que Lambda interroge dans une partition via un facteur de parallélisation compris entre 1 (valeur par défaut) et 10. Par exemple, lorsque la valeur ParallelizationFactor est définie sur 2, vous pouvez avoir 200 appels Lambda simultanés au maximum pour traiter 100 fragments de données Kinesis. Cela permet d’augmenter le débit de traitement lorsque le volume de données est volatile et que IteratorAge est élevé.

Autorisations du rôle d'exécution

Lambda a besoin des autorisations suivantes pour gérer les ressources liées à votre flux DynamoDB. Ajoutez-les au rôle d'exécution de votre fonction.

La stratégie gérée AWSLambdaDynamoDBExecutionRole inclut ces autorisations. Pour en savoir plus, consultez Rôle d'exécution AWS Lambda.

Pour envoyer les enregistrements des lots ayant échoué à une file d'attente ou à une rubrique, votre fonction a besoin d'autorisations supplémentaires. Chaque service de destination nécessite une autorisation différente, comme suit :

Configuration d'un flux comme source d'événement

Créez un mappage de source d'événement pour indiquer à Lambda d'envoyer des enregistrements à partir de votre flux à une fonction Lambda. Vous pouvez créer plusieurs mappages de source d'événement pour traiter les mêmes données avec plusieurs fonctions Lambda, ou pour traiter des éléments à partir de plusieurs flux avec une seule fonction.

Pour configurer votre fonction afin de lire à partir de Flux DynamoDB dans la console Lambda, créez un déclencheur DynamoDB.

Pour créer un déclencheur

  1. Ouvrez la page des fonctions sur la console Lambda.

  2. Choisissez une fonction.

  3. Sous Designer (Concepteur), choisissez Add trigger (Ajouter un déclencheur).

  4. Choisissez un type de déclencheur.

  5. Configurez les options requises, puis choisissez Ajouter.

Lambda prend en charge les options suivantes pour les sources d'événements DynamoDB.

Options de source d'événement

  • Table DynamoDB– La table DynamoDB à partir de laquelle lire les enregistrements.

  • Taille de lot – Nombre d'enregistrements à envoyer à la fonction dans chaque lot, jusqu'à 1 000. Lambda transmet tous les enregistrements du lot à la fonction en un seul appel, tant que la taille totale des événements ne dépasse pas la limite de charge utile pour un appel synchrone (6 MB).

  • Batch window (Fenêtre de lot) – Spécifiez l'intervalle de temps maximal pour collecter des enregistrements avant d'appeler la fonction, en secondes.

  • Starting position (Position de départ) : traitez uniquement les nouveaux enregistrements, ou tous enregistrement existants.

    • Dernier : traitez les nouveaux enregistrements ajoutés au flux.

    • Trim horizon (Supprimer l'horizon) : traitez tous les enregistrements du flux.

    Après le traitement de tous les enregistrements existants, la fonction est à jour et continue à traiter les nouveaux enregistrements.

  • On-failure destination (Destination en cas d'échec) – File d'attente SQS ou rubrique SNS pour les enregistrements qui ne peuvent pas être traités. Lorsque Lambda supprime un lot d'enregistrements parce qu'il est trop vieux ou parce que toutes les nouvelles tentatives ont été épuisées, il envoie les détails sur ce lot à la fille d'attente ou à la rubrique.

  • Retry attempts (Nouvelles tentatives) – Nombre maximal de nouvelles tentatives effectuées par Lambda lorsqu'une fonction renvoie une erreur. Cela ne s'applique pas aux limitations ou erreurs de service où le lot n'a pas atteint la fonction.

  • Maximum age of record (Âge maximal de l'enregistrement) – Âge maximal d'un enregistrement que Lambda envoie à votre fonction.

  • Split batch on error (Fractionner le lot en cas d'erreur) – Lorsque la fonction renvoie une erreur, fractionnez le lot en deux avant de réessayer.

  • Concurrent batches per shard (Lots simultanés par partition) – Traitez plusieurs lots simultanément à partir de la même partition.

  • Enabled (Activé) : définissez la valeur sur « true » pour activer le mappage de la source d'événement. Définissez la valeur sur « false » pour arrêter le traitement des enregistrements. Lambda assure le suivi du dernier enregistrement traité et reprend le traitement à ce point lorsque le mappage est réactivé.

Note

Vous n'êtes pas facturé pour les appels d'API GetRecords invoqués par Lambda dans le cadre des déclencheurs DynamoDB.

Pour gérer ultérieurement la configuration de la source d'événement, choisissez le déclencheur dans le concepteur.

API de mappage de la source d'événement

Pour gérer une source d'événements avec le AWS CLI ou le kit AWS SDK, vous pouvez utiliser les opérations d'API suivantes :

L'exemple suivant utilise l'AWS CLI pour mapper une fonction nommée my-function à un flux DynamoDB spécifié par son Amazon Resource Name (ARN), avec une taille de lot égale à 500.

$ aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525 { "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Configurez des options supplémentaires pour personnaliser la manière dont les lots sont traités et pour indiquer quand supprimer les enregistrements qui ne peuvent pas être traités. L'exemple suivant met à jour un mappage de source d'événement pour envoyer un enregistrement d'échec à une file d'attente SQS après deux nouvelles tentatives ou si les enregistrements datent de plus d'une heure.

$ aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}' { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Les paramètres mis à jour sont appliqués de façon asynchrone et ne sont pas reflétés dans la sortie tant que le processus n'est pas terminé. Utilisez la commande get-event-source-mapping pour afficher le statut actuel.

$ aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Pour traiter plusieurs lots simultanément, utilisez l'option --parallelization-factor.

$ aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Gestion des erreurs

Le mappage de source d'événement qui lit les enregistrements à partir de votre flux DynamoDB appelle votre fonction de façon synchrone et réessaie en cas d'erreur. Si la fonction est limitée ou si le service Lambda renvoie une erreur sans appeler la fonction, Lambda réessaie jusqu'à ce que les enregistrements expirent ou dépassent l'âge maximal que vous configurez dans le mappage de source d'événement.

Si la fonction reçoit les enregistrements mais renvoie une erreur, Lambda réessaye jusqu'à ce que les enregistrements du lot expirent, dépassent l'âge maximal ou atteignent le quota de nouvelles tentatives configurée. Pour les erreurs de fonction, vous pouvez également configurer le mappage de source d'événement pour fractionner un lot ayant échoué en deux lots. Le fait de réessayer avec des lots plus petits permet d'isoler les enregistrements incorrects et de contourner les problèmes de dépassement de délai. Le fractionnement d'un lot n'est pas comptabilisé dans le quota de nouvelles tentatives.

Si les mesures de gestion des erreurs échouent, Lambda supprime les enregistrements et continue de traiter les lots à partir du flux. Avec les paramètres par défaut, cela signifie qu'un enregistrement incorrect peut bloquer le traitement sur la partition concernée pendant jusqu'à one day. Pour éviter cela, configurez le mappage de source d'événement de votre fonction avec un nombre raisonnable de nouvelles tentatives et un âge maximal des enregistrements adapté à votre cas d'utilisation.

Pour conserver un enregistrement des lots supprimés, configurez une destination en cas d'échec. Lambda envoie un document à la rubrique ou à la file d'attente de cette destination avec des détails sur le lot.

Pour configurer une destination pour enregistrer les événements ayant échoué.

  1. Ouvrez la page des fonctions sur la console Lambda.

  2. Choisissez une fonction.

  3. Sous Designer (Concepteur), choisissez Add destination (Ajouter une destination).

  4. Pour Source, choisissez Stream invocation (Appel de flux).

  5. Pour Stream (Flux), choisissez un flux qui est mappé à la fonction.

  6. Pour Type de destination, choisissez le type de ressource qui reçoit l'enregistrement d'appel.

  7. Pour Destination, choisissez une ressource.

  8. Choisissez Enregistrer.

L'exemple suivant illustre un enregistrement d'appel pour un flux DynamoDB.

Exemple Enregistrement d'appel

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Vous pouvez utiliser ces informations pour récupérer les enregistrements concernés à partir du flux afin de résoudre les problèmes. Les enregistrements réels ne sont pas inclus. Vous devez donc traiter cet enregistrement et les récupérer à partir du flux avant qu'ils expirent et soient perdus.

Métriques Amazon CloudWatch

Lambda émet la métrique IteratorAge lorsque votre fonction termine le traitement d'un lot d'enregistrements. Cette métrique indique l'âge du dernier enregistrement du lot à la fin du traitement. Si votre fonction traite de nouveaux événements, vous pouvez utiliser l'âge de l'itérateur pour estimer la latence entre le moment où un enregistrement est ajouté et celui où la fonction le traite.

Une tendance à la hausse de l'âge de l'itérateur peut indiquer des problèmes liés à votre fonction. Pour de plus amples informations, veuillez consulter Utilisation des métriques de fonction AWS Lambda.