DynamoDB Streams 및 유지 시간(TTL) - Amazon DynamoDB

DynamoDB Streams 및 유지 시간(TTL)

테이블에서 Amazon DynamoDB Streams를 활성화하고 만료된 항목의 스트림 레코드를 처리하여 유지 시간(TTL)에 의해 삭제된 항목을 백업하거나 처리할 수 있습니다.

스트림 레코드에는 사용자 ID 필드 Records[<index>].userIdentity가 포함되어 있습니다.

만료 후 유지 시간(TTL) 프로세스에 의해 삭제된 항목에는 다음 필드가 있습니다.

  • Records[<index>].userIdentity.type

    "Service"

  • Records[<index>].userIdentity.principalId

    "dynamodb.amazonaws.com"

다음 JSON은 단일 스트림 레코드의 해당 부분을 보여 줍니다.

"Records": [ { ... "userIdentity": { "type": "Service", "principalId": "dynamodb.amazonaws.com" } ... } ]

DynamoDB Streams 및 Lambda를 사용하여 TTL 삭제 항목 보관

DynamoDB 유지 시간(TTL), DynamoDB StreamsAWS Lambda를 결합하면 데이터 보관을 간소화하고 DynamoDB 스토리지 비용을 절감하며 코드 복잡성을 줄일 수 있습니다. Lambda를 스트림 소비자로 사용할 경우 많은 이점이 있습니다. 특히 Kinesis Client Library(KCL)와 같은 여타 소비자에 비해 비용이 절감됩니다. Lambda를 사용하여 이벤트를 소비할 때 DynamoDB 스트림에서 GetRecords API 호출에 대한 요금이 부과되지 않으며, Lambda는 스트림 이벤트에서 JSON 패턴을 식별하여 이벤트 필터링을 제공할 수 있습니다. 이벤트 패턴 콘텐츠 필터링을 통해 최대 5가지 필터를 정의하여 처리를 위해 Lambda로 전송되는 이벤트를 제어할 수 있습니다. 이렇게 함으로써 Lambda 함수의 호출 수가 감소하고 코드가 간소화되며 전체 비용이 절감됩니다.

DynamoDB Streams에는 Create, ModifyRemove 작업과 같은 모든 데이터 수정 사항이 포함되어 있지만 이로 인해 아카이브 Lambda 함수가 원치 않게 호출될 수 있습니다. 예를 들어 시간당 200만 개의 데이터 수정 사항이 스트림으로 유입되는 테이블이 있는데 이 중 5% 미만이 TTL 프로세스를 통해 만료되고 보관해야 하는 삭제 항목이라고 가정해 보겠습니다. Lambda 이벤트 소스 필터를 사용하면 Lambda 함수가 시간당 10만 회만 호출합니다. 이벤트 필터링의 결과로, 이벤트 필터링 없이 수행될 200만 회의 호출 대신 필요한 호출에 대해서만 요금이 부과됩니다.

이벤트 필터링은 선택한 이벤트(DynamoDB 스트림)에서 읽기를 수행하고 Lambda 함수를 호출하는 리소스인 Lambda 이벤트 소스 매핑에 적용됩니다. 다음 다이어그램에서는 스트림 및 이벤트 필터를 사용하여 Lambda 함수에 의해 유지 시간(TTL) 삭제 항목이 소비되는 방법을 볼 수 있습니다.

DynamoDB 유지 시간(TTL) 이벤트 필터 패턴

이벤트 소스 매핑 필터 기준에 다음 JSON을 추가하면 TTL 삭제 항목에 대해서만 Lambda 함수를 호출할 수 있습니다.

{ "Filters": [ { "Pattern": { "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } } } ] }

AWS Lambda 이벤트 소스 매핑 생성

다음 코드 조각을 사용하여 테이블의 DynamoDB 스트림에 연결할 수 있는 필터링된 이벤트 소스 매핑을 생성합니다. 각 코드 블록에는 이벤트 필터 패턴이 포함됩니다.

AWS CLI
aws lambda create-event-source-mapping \ --event-source-arn 'arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000' \ --batch-size 10 \ --enabled \ --function-name test_func \ --starting-position LATEST \ --filter-criteria '{"Filters": [{"Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}"}]}'
Java
LambdaClient client = LambdaClient.builder() .region(Region.EU_WEST_1) .build(); Filter userIdentity = Filter.builder() .pattern("{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}") .build(); FilterCriteria filterCriteria = FilterCriteria.builder() .filters(userIdentity) .build(); CreateEventSourceMappingRequest mappingRequest = CreateEventSourceMappingRequest.builder() .eventSourceArn("arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000") .batchSize(10) .enabled(Boolean.TRUE) .functionName("test_func") .startingPosition("LATEST") .filterCriteria(filterCriteria) .build(); try{ CreateEventSourceMappingResponse eventSourceMappingResponse = client.createEventSourceMapping(mappingRequest); System.out.println("The mapping ARN is "+eventSourceMappingResponse.eventSourceArn()); }catch (ServiceException e){ System.out.println(e.getMessage()); }
Node
const client = new LambdaClient({ region: "eu-west-1" }); const input = { EventSourceArn: "arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000", BatchSize: 10, Enabled: true, FunctionName: "test_func", StartingPosition: "LATEST", FilterCriteria: { "Filters": [{ "Pattern": "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }] } } const command = new CreateEventSourceMappingCommand(input); try { const results = await client.send(command); console.log(results); } catch (err) { console.error(err); }
Python
session = boto3.session.Session(region_name = 'eu-west-1') client = session.client('lambda') try: response = client.create_event_source_mapping( EventSourceArn='arn:aws:dynamodb:eu-west-1:012345678910:table/test/stream/2021-12-10T00:00:00.000', BatchSize=10, Enabled=True, FunctionName='test_func', StartingPosition='LATEST', FilterCriteria={ 'Filters': [ { 'Pattern': "{\"userIdentity\":{\"type\":[\"Service\"],\"principalId\":[\"dynamodb.amazonaws.com\"]}}" }, ] } ) print(response) except Exception as e: print(e)
JSON
{ "userIdentity": { "type": ["Service"], "principalId": ["dynamodb.amazonaws.com"] } }