자습서 #2: 필터를 사용하여 DynamoDB 및 Lambda에서 일부 이벤트 처리 - Amazon DynamoDB

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

자습서 #2: 필터를 사용하여 DynamoDB 및 Lambda에서 일부 이벤트 처리

이 자습서에서는 DynamoDB 테이블의 스트림에 있는 일부 이벤트만 처리하는 AWS Lambda 트리거를 생성합니다.

Lambda 이벤트 필터링을 사용하면 필터 표현식을 사용하여 Lambda가 어떤 이벤트를 처리를 위해 함수로 보내는지를 제어할 수 있습니다. DynamoDB Streams당 최대 5개의 서로 다른 필터를 구성할 수 있습니다. 일괄 처리 기간을 사용하는 경우 Lambda는 각 새 이벤트에 필터 기준을 적용하여 현재 배치에 포함되어야 하는지 확인합니다.

필터는 FilterCriteria라는 구조를 통해 적용됩니다. FilterCriteria의 3가지 주요 속성은 metadata properties, data propertiesfilter patterns입니다.

다음은 DynamoDB Streams 이벤트의 예제 구조입니다.

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

metadata properties는 이벤트 객체의 필드입니다. DynamoDB Streams의 경우 metadata propertiesdynamodb 또는 eventName과 같은 필드입니다.

data properties은 이벤트 본문의 필드입니다. data properties을 필터링하려면 적절한 키 내의 FilterCriteria에 해당 속성을 포함해야 합니다. DynamoDB 이벤트 소스의 경우 데이터 키는 NewImage 또는 OldImage입니다.

마지막으로 필터 규칙은 특정 속성에 적용할 필터 표현식을 정의합니다. 여기 몇 가지 예가 있습니다:

비교 연산자 규칙 구문(부분적)

Null

제품 유형이 null입니다.

{ "product_type": { "S": null } }

비어 있음

제품 이름이 비어 있습니다.

{ "product_name": { "S": [ ""] } }

같음

주가 플로리다와 같습니다.

{ "state": { "S": ["FL"] } }

And

제품 주는 플로리다와 같고 제품 범주는 초콜릿과 같습니다.

{ "state": { "S": ["FL"] } , "category": { "S": [ "CHOCOLATE"] } }

Or

제품 주는 플로리다 또는 캘리포니아입니다.

{ "state": { "S": ["FL","CA"] } }

아님

제품 주는 플로리다가 아닙니다.

{"state": {"S": [{"anything-but": ["FL"]}]}}

존재함

홈메이드 제품이 있습니다.

{"homemade": {"S": [{"exists": true}]}}

존재하지 않음

홈메이드 제품이 존재하지 않습니다

{"homemade": {"S": [{"exists": false}]}}

다음으로 시작

PK는 COMPANY로 시작합니다.

{"PK": {"S": [{"prefix": "COMPANY"}]}}

Lambda 함수에 대해 최대 5개의 이벤트 필터링 패턴을 지정할 수 있습니다. 5개의 이벤트 각각은 논리적 OR로 평가됩니다. 따라서 Filter_OneFilter_Two라는 두 개의 필터를 구성하면 Lambda 함수가 Filter_One 또는 Filter_Two를 실행합니다.

참고

Lambda 이벤트 필터링 페이지에는 숫자 값을 필터링하고 비교하는 몇 가지 옵션이 있지만 DynamoDB 필터 이벤트의 경우 DynamoDB의 숫자가 문자열로 저장되기 때문에 적용되지 않습니다. 예를 들어 "quantity": { "N": "50" }의 경우, "N" 속성 때문에 숫자임을 알 수 있습니다.

종합 - AWS CloudFormation

이벤트 필터링 기능을 실제로 보여주기 위한 샘플 CloudFormation 템플릿은 다음과 같습니다. 이 템플릿은 Amazon DynamoDB Streams가 활성화된 파티션 키 PK 및 정렬 키 SK가 있는 간단한 DynamoDB 테이블을 생성합니다. 그러면 Amazon Cloudwatch에 로그를 쓰고 Amazon DynamoDB 스트림에서 이벤트를 읽을 수 있는 람다 함수와 간단한 Lambda 실행 역할이 생성됩니다. 또한 DynamoDB Streams와 Lambda 함수 사이에 이벤트 소스 매핑을 추가하므로 Amazon DynamoDB Streams에 이벤트가 있을 때마다 함수를 실행할 수 있습니다.

AWSTemplateFormatVersion: "2010-09-09" Description: Sample application that presents AWS Lambda event source filtering with Amazon DynamoDB Streams. Resources: StreamsSampleDDBTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: "PK" AttributeType: "S" - AttributeName: "SK" AttributeType: "S" KeySchema: - AttributeName: "PK" KeyType: "HASH" - AttributeName: "SK" KeyType: "RANGE" StreamSpecification: StreamViewType: "NEW_AND_OLD_IMAGES" ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 LambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: arn:aws:logs:*:*:* - Effect: Allow Action: - dynamodb:DescribeStream - dynamodb:GetRecords - dynamodb:GetShardIterator - dynamodb:ListStreams Resource: !GetAtt StreamsSampleDDBTable.StreamArn EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST ProcessEventLambda: Type: AWS::Lambda::Function Properties: Runtime: python3.7 Timeout: 300 Handler: index.handler Role: !GetAtt LambdaExecutionRole.Arn Code: ZipFile: | import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec) Outputs: StreamsSampleDDBTable: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.Arn StreamARN: Description: DynamoDB Table ARN created for this example Value: !GetAtt StreamsSampleDDBTable.StreamArn

이 클라우드 구성 템플릿을 배포한 후 다음과 같은 Amazon DynamoDB 항목을 삽입할 수 있습니다.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

이 클라우드 형성 템플릿에 인라인으로 포함된 간단한 람다 함수 덕분에 다음과 같이 lambda 함수에 대한 Amazon CloudWatch 로그 그룹의 이벤트를 볼 수 있습니다.

{ "eventID": "c9fbe7d0261a5163fcb6940593e41797", "eventName": "INSERT", "eventVersion": "1.1", "eventSource": "aws:dynamodb", "awsRegion": "us-east-2", "dynamodb": { "ApproximateCreationDateTime": 1664559083.0, "Keys": { "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" } }, "NewImage": { "quantity": { "N": "50" }, "company_id": { "S": "1000" }, "fabric": { "S": "Florida Chocolates" }, "price": { "N": "15" }, "stores": { "N": "5" }, "product_id": { "S": "1000" }, "SK": { "S": "PRODUCT#CHOCOLATE#DARK#1000" }, "PK": { "S": "COMPANY#1000" }, "state": { "S": "FL" }, "type": { "S": "" } }, "SequenceNumber": "700000000000888747038", "SizeBytes": 174, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "eventSourceARN": "arn:aws:dynamodb:us-east-2:111122223333:table/chocolate-table-StreamsSampleDDBTable-LUOI6UXQY7J1/stream/2022-09-30T17:05:53.209" }

필터링 예제

  • 지정된 주와 일치하는 제품만

이 예제는 플로리다에서 생산되는 모든 제품 (약어 “FL”) 과 일치하도록 CloudFormation 템플릿을 수정합니다.

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

스택을 재배포하고 나면 다음 DynamoDB 항목을 테이블에 추가할 수 있습니다. 이 예제의 제품은 캘리포니아에서 만들어진 제품이므로 Lambda 함수 로그에는 표시되지 않습니다.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK#1000", "company_id": "1000", "fabric": "Florida Chocolates", "price": 15, "product_id": "1000", "quantity": 50, "state": "CA", "stores": 5, "type": "" }
  • PK 및 SK의 일부 값으로 시작하는 항목만

이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

AND 조건을 사용하려면 패턴 내부에 조건이 있어야 하며 이 경우 PK 및 SK 키가 쉼표로 구분되어 동일한 표현식 안에 있습니다.

PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.

이 예에서는 다음 조건을 포함하도록 CloudFormation 템플릿을 수정합니다.

EventSourceDDBTableStream: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 Enabled: True FilterCriteria: Filters: - Pattern: '{"dynamodb": {"Keys": {"PK": { "S": [{ "prefix": "COMPANY" }] },"SK": { "S": [{ "prefix": "PRODUCT" }] }}}}' - Pattern: '{ "dynamodb": { "NewImage": { "state": { "S": ["FL"] } } } }' EventSourceArn: !GetAtt StreamsSampleDDBTable.StreamArn FunctionName: !GetAtt ProcessEventLambda.Arn StartingPosition: LATEST

필터 섹션에 새 패턴을 도입하여 OR 조건이 추가되었음을 알 수 있습니다.

종합 - CDK

다음 샘플 CDK 프로젝트 구성 템플릿은 이벤트 필터링 기능을 안내합니다. 이 CDK 프로젝트로 작업하기 전에 준비 스크립트 실행을 포함한 전제 조건을 설치해야 합니다.

CDK 프로젝트 만들기

먼저 빈 cdk init 디렉터리에서 호출하여 새 AWS CDK 프로젝트를 생성합니다.

mkdir ddb_filters cd ddb_filters cdk init app --language python

cdk init 명령은 프로젝트 폴더의 이름을 사용하여 클래스, 하위 폴더 및 파일을 포함한 프로젝트의 다양한 요소의 이름을 지정합니다. 폴더 이름의 하이픈은 밑줄로 변환됩니다. 그렇지 않으면 이름은 Python 식별자 형식을 따라야 합니다. 예를 들어 숫자로 시작하거나 공백을 포함해서는 안 됩니다.

새 프로젝트를 사용하려면 해당 가상 환경을 활성화하세요. 이렇게 하면 프로젝트의 종속성을 전역적으로 설치하는 대신 프로젝트 폴더에 로컬로 설치할 수 있습니다.

source .venv/bin/activate python -m pip install -r requirements.txt
참고

이를 Mac/Linux 명령으로 인식하여 가상 환경을 활성화할 수 있습니다. Python 템플릿에는 Windows에서 동일한 명령을 사용할 수 있도록 하는 배치 파일인 source.bat가 포함되어 있습니다. 기존 Windows 명령 .venv\Scripts\activate.bat도 작동합니다. AWS CDK Toolkit v1.70.0 이전 버전을 사용하여 AWS CDK 프로젝트를 초기화한 경우 가상 환경이 대신 디렉토리에 있습니다. .env .venv

기본 인프라

선호하는 텍스트 편집기로 ./ddb_filters/ddb_filters_stack.py 파일을 엽니다. 이 파일은 AWS CDK 프로젝트를 만들 때 자동 생성되었습니다.

다음으로 _create_ddb_table_set_ddb_trigger_function 함수를 추가합니다. 이러한 함수는 프로비저닝 모드 온디맨드 모드에서 파티션 키 PK 및 정렬 키 SK가 있는 DynamoDB 테이블을 생성하며 Amazon DynamoDB Streams가 기본적으로 활성화되어 새 이미지와 이전 이미지를 표시합니다.

Lambda 함수는 app.py 파일 아래의 lambda 폴더에 저장됩니다. 이 파일은 나중에 생성됩니다. 여기에는 이 스택에서 생성된 Amazon DynamoDB 테이블의 이름이 될 환경 변수 APP_TABLE_NAME이 포함됩니다. 동일한 함수에서 Lambda 함수에 스트림 읽기 권한을 부여합니다. 마지막으로 DynamoDB Streams를 람다 함수의 이벤트 소스로 구독합니다.

__init__ 메서드의 파일 끝에서 각 구성을 호출하여 스택에서 초기화합니다. 추가 구성 요소와 서비스가 필요한 더 큰 프로젝트의 경우 기본 스택 외부에서 이러한 구성을 정의하는 것이 가장 좋습니다.

import os import json import aws_cdk as cdk from aws_cdk import ( Stack, aws_lambda as _lambda, aws_dynamodb as dynamodb, ) from constructs import Construct class DdbFiltersStack(Stack): def _create_ddb_table(self): dynamodb_table = dynamodb.Table( self, "AppTable", partition_key=dynamodb.Attribute( name="PK", type=dynamodb.AttributeType.STRING ), sort_key=dynamodb.Attribute( name="SK", type=dynamodb.AttributeType.STRING), billing_mode=dynamodb.BillingMode.PAY_PER_REQUEST, stream=dynamodb.StreamViewType.NEW_AND_OLD_IMAGES, removal_policy=cdk.RemovalPolicy.DESTROY, ) cdk.CfnOutput(self, "AppTableName", value=dynamodb_table.table_name) return dynamodb_table def _set_ddb_trigger_function(self, ddb_table): events_lambda = _lambda.Function( self, "LambdaHandler", runtime=_lambda.Runtime.PYTHON_3_9, code=_lambda.Code.from_asset("lambda"), handler="app.handler", environment={ "APP_TABLE_NAME": ddb_table.table_name, }, ) ddb_table.grant_stream_read(events_lambda) event_subscription = _lambda.CfnEventSourceMapping( scope=self, id="companyInsertsOnlyEventSourceMapping", function_name=events_lambda.function_name, event_source_arn=ddb_table.table_stream_arn, maximum_batching_window_in_seconds=1, starting_position="LATEST", batch_size=1, ) def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None: super().__init__(scope, construct_id, **kwargs) ddb_table = self._create_ddb_table() self._set_ddb_trigger_function(ddb_table)

이제 Amazon에 로그를 인쇄하는 매우 간단한 람다 함수를 만들어 보겠습니다. CloudWatch 이를 위해 lambda라는 새 폴더를 만듭니다.

mkdir lambda touch app.py

자주 사용하는 텍스트 편집기를 사용하여 다음 내용을 app.py 파일에 추가합니다.

import logging LOGGER = logging.getLogger() LOGGER.setLevel(logging.INFO) def handler(event, context): LOGGER.info('Received Event: %s', event) for rec in event['Records']: LOGGER.info('Record: %s', rec)

/ddb_filters/ 폴더에 있는지 확인하고 다음 명령을 입력하여 샘플 애플리케이션을 생성합니다.

cdk deploy

어느 시점에서 솔루션 배포 여부를 확인하라는 메시지가 표시됩니다. Y를 입력하여 변경 내용을 적용합니다.

├───┼──────────────────────────────┼────────────────────────────────────────────────────────────────────────────────┤ │ + │ ${LambdaHandler/ServiceRole} │ arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole │ └───┴──────────────────────────────┴────────────────────────────────────────────────────────────────────────────────┘ Do you wish to deploy these changes (y/n)? y ... ✨ Deployment time: 67.73s Outputs: DdbFiltersStack.AppTableName = DdbFiltersStack-AppTable815C50BC-1M1W7209V5YPP Stack ARN: arn:aws:cloudformation:us-east-2:111122223333:stack/DdbFiltersStack/66873140-40f3-11ed-8e93-0a74f296a8f6

변경 사항이 배포되면 AWS 콘솔을 열고 테이블에 항목 하나를 추가합니다.

{ "PK": "COMPANY#1000", "SK": "PRODUCT#CHOCOLATE#DARK", "company_id": "1000", "type": "", "state": "FL", "stores": 5, "price": 15, "quantity": 50, "fabric": "Florida Chocolates" }

이제 이 항목의 모든 정보가 CloudWatch 로그에 포함되어야 합니다.

필터링 예제

  • 지정된 주와 일치하는 제품만

ddb_filters/ddb_filters/ddb_filters_stack.py 파일을 열고 "FL"과 동일한 모든 제품과 일치하는 필터를 포함하도록 수정합니다. 이는 45행의 event_subscription 바로 아래에서 수정할 수 있습니다.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )
  • PK 및 SK의 일부 값으로 시작하는 항목만

다음 조건을 포함하도록 Python 스크립트를 수정합니다.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, ] },
  • PK 및 SK의 일부 값으로 시작하거나 특정 상태에서 시작합니다.

다음 조건을 포함하도록 Python 스크립트를 수정합니다.

event_subscription.add_property_override( property_path="FilterCriteria", value={ "Filters": [ { "Pattern": json.dumps( { { "dynamodb": { "Keys": { "PK": {"S": [{"prefix": "COMPANY"}]}, "SK": {"S": [{"prefix": "PRODUCT"}]}, } } } } ) }, { "Pattern": json.dumps( {"dynamodb": {"NewImage": {"state": {"S": ["FL"]}}}} ) }, ] }, )

필터 배열에 더 많은 요소를 추가하면 OR 조건이 추가됩니다.

정리

작업 디렉터리의 베이스에서 필터 스택을 찾고 cdk destroy를 실행합니다. 리소스 삭제를 확인하라는 메시지가 표시됩니다.

cdk destroy Are you sure you want to delete: DdbFiltersStack (y/n)? y