DynamoDB Streams et time-to-live - Amazon DynamoDB

DynamoDB Streams et time-to-live

Vous pouvez sauvegarder, ou traiter de toute autre façon, les éléments supprimés par Time-to-live (TTL) en activant Amazon DynamoDB Streams sur la table et en traitant les registres de flux des éléments expirés.

L'enregistrement de flux contient un champ d'identité utilisateur Records[<index>].userIdentity.

Les éléments supprimés par le processus Time-to-live après expiration ont les champs suivants :

  • Records[<index>].userIdentity.type

    "Service"

  • Records[<index>].userIdentity.principalId

    "dynamodb.amazonaws.com"

Le JSON suivant montre la partie pertinente d'un seul enregistrement de flux.

"Records": [ { ... "userIdentity": { "type": "Service", "principalId": "dynamodb.amazonaws.com" } ... } ]

Utilisation de DynamoDB Streams et Lambda pour archiver les éléments supprimés TTL

La combinaison de la DynamoDB time-to-live (TTL), de DynamoDB Streams, et de AWS Lambda peut permettre de simplifier l'archivage des données, de réduire les coûts de stockage de DynamoDB et de réduire la complexité du code. L'utilisation de Lambda comme consommateur de flux offre de nombreux avantages, notamment la réduction des coûts par rapport à d'autres consommateurs tels que Kinesis Client Library (KCL). Vous n'êtes pas facturé pour les appels d'API GetRecords sur votre flux DynamoDB lorsque vous utilisez Lambda pour consommer des événements, et Lambda peut fournir un filtrage d'événements en identifiant les modèles JSON dans un événement de flux. Avec le filtrage de contenu des modèles d'événements, vous pouvez définir jusqu'à cinq filtres différents pour contrôler quels événements sont envoyés à Lambda en vue d'être traités. Cela permet de réduire les appels de vos fonctions Lambda, de simplifier le code et de réduire le coût global.

Alors que DynamoDB Streams contient toutes les modifications de données, telles que les actions Create, Modify et Remove, cela peut entraîner des appels indésirables de votre fonction Lambda d'archivage. Par exemple, supposons que vous ayez une table contenant 2 millions de modifications de données par heure dans le flux, mais que moins de 5 % d'entre elles soient des suppressions d'éléments qui expireront via le processus TTL et devront être archivées. Avec les filtres de source d'événement Lambda, la fonction Lambda n'effectue que 100 000 appels par heure. Il en résulte que le filtrage des événements vous est facturé uniquement pour les appels nécessaires au lieu des 2 millions d'appels que vous auriez sans le filtrage des événements.

Le filtrage des événements est appliqué au mappage de source d'événement Lambda, une ressource qui lit à partir d'un événement choisi (le flux DynamoDB) et appelle une fonction Lambda. Dans le diagramme suivant, vous pouvez voir comment un élément time-to-live supprimé est consommé par une fonction Lambda utilisant des flux et des filtres d'événements.

Modèle de filtrage d'événement DynamoDB time-to-live

L'ajout du JSON suivant à vos critères de filtrage du mappage de source d'événement permet l'appel de votre fonction Lambda uniquement pour les éléments supprimés TTL :

{ "Filters": [ { "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } } } ] }

Création d'un mappage de source d'événement dans AWS Lambda

Utilisez les extraits de code suivants pour créer un mappage de source d'événement filtré que vous pouvez connecter au flux DynamoDB d'une table. Chaque bloc de code inclut le modèle de filtre d'événement.

AWS CLI
aws lambda create-event-source-mapping \ --event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \ --batch-size 10 \ --enabled \ --function-name test_func \ --starting-position LATEST \ --filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
Java
LambdaClient client = LambdaClient.builder() .region(Region.EU_WEST_1) .build(); Filter userIdentity = Filter.builder() .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}") .build(); FilterCriteria filterCriteria = FilterCriteria.builder() .filters(userIdentity) .build(); CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder() .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000") .batchSize(10) .enabled(Boolean.TRUE) .functionName("test_func") .startingPosition("LATEST") .filterCriteria(filterCriteria) .build(); try{ CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest); System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn()); }catch (ServiceException e){ System.out.println(e.getMessage()); }
Node
const client = new LambdaClient({ region: "eu-west-1" }); const input = { EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000", BatchSize: 10, Enabled: true, FunctionName: "test_func", StartingPosition: "LATEST", FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] } } const command = new CreateEventSourceMappingCommand(input); try { const results = await client.send(command); console.log(results); } catch (err) { console.error(err); }
Python
session = boto3.session.Session(region_name = 'eu-west-1') client = session.client('lambda') try: response = client.create_event_source_mapping( EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000', BatchSize=10, Enabled=True, FunctionName='test_func', StartingPosition='LATEST', FilterCriteria={ 'Filters': [ { 'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }, ] } ) print(response) except Exception as e: print(e)
JSON
{ "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }