Amazon MSK 이벤트 소스를 통해 이벤트 필터링 사용 - AWS Lambda

Amazon MSK 이벤트 소스를 통해 이벤트 필터링 사용

이벤트 필터링을 사용하여 Lambda가 함수로 전송하는 스트림 또는 대기열의 레코드를 제어할 수 있습니다. 이벤트 필터링의 작동 방식에 대한 일반적인 내용은 Lambda가 함수로 보내는 이벤트에 대한 제어을 참조하세요.

이 섹션에서는 Amazon MSK 이벤트 소스에 대한 이벤트 필터링에 중점을 둡니다.

Amazon MSK 이벤트 필터링 기본 사항

생산자가 Amazon MSK 클러스터의 주제에 유효한 JSON 형식 또는 일반 문자열로 메시지를 작성한다고 가정해 보겠습니다. 예제 레코드는 다음과 같으며, 메시지는 value 필드에서 Base64로 인코딩된 문자열로 변환됩니다.

{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[] } ] }

Apache Kafka 생산자가 다음 JSON 형식으로 주제에 메시지를 작성하고 있다고 가정해 보겠습니다.

{ "device_ID": "AB1234", "session":{ "start_time": "yyyy-mm-ddThh:mm:ss", "duration": 162 } }

value 키를 사용하여 레코드를 필터링할 수 있습니다. device_ID가 문자 AB로 시작하는 레코드만 필터링하려고 한다고 가정해 보겠습니다. FilterCriteria 객체는 다음과 같습니다.

{ "Filters": [ { "Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }" } ] }

명확성을 더하기 위해 일반 JSON으로 확장된 필터의 Pattern 값은 다음과 같습니다.

{ "value": { "device_ID": [ { "prefix": "AB" } ] } }

콘솔, AWS CLI 또는 AWS SAM 템플릿을 사용하여 필터를 추가할 수 있습니다.

Console

콘솔을 사용하여 이 필터를 추가하려면 이벤트 소스 매핑에 필터 기준 연결(콘솔)의 지침을 따르고 필터 기준에 대해 다음 문자열을 입력합니다.

{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }
AWS CLI

AWS Command Line Interface(AWS CLI)를 사용하여 이러한 필터 기준으로 새 이벤트 소스 매핑을 생성하려면 다음 명령을 실행합니다.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'

이러한 필터 기준을 기존 이벤트 소스 매핑에 추가하려면 다음 명령을 실행합니다.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : { \"device_ID\" : [ { \"prefix\": \"AB\" } ] } }"}]}'
AWS SAM

AWS SAM을 사용하여 이 필터를 추가하려면 이벤트 소스의 YAML 템플릿에 다음 코드 조각을 추가합니다.

FilterCriteria: Filters: - Pattern: '{ "value" : { "device_ID" : [ { "prefix": "AB" } ] } }'

Amazon MSK를 사용하면 메시지가 일반 문자열인 레코드를 필터링할 수도 있습니다. 문자열이 'error'인 메시지를 무시하려고 한다고 가정해 보겠습니다. FilterCriteria 객체는 다음과 같습니다.

{ "Filters": [ { "Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }" } ] }

명확성을 더하기 위해 일반 JSON으로 확장된 필터의 Pattern 값은 다음과 같습니다.

{ "value": [ { "anything-but": [ "error" ] } ] }

콘솔, AWS CLI 또는 AWS SAM 템플릿을 사용하여 필터를 추가할 수 있습니다.

Console

콘솔을 사용하여 이 필터를 추가하려면 이벤트 소스 매핑에 필터 기준 연결(콘솔)의 지침을 따르고 필터 기준에 대해 다음 문자열을 입력합니다.

{ "value" : [ { "anything-but": [ "error" ] } ] }
AWS CLI

AWS Command Line Interface(AWS CLI)를 사용하여 이러한 필터 기준으로 새 이벤트 소스 매핑을 생성하려면 다음 명령을 실행합니다.

aws lambda create-event-source-mapping \ --function-name my-function \ --event-source-arn arn:aws:kafka:us-east-2:123456789012:cluster/my-cluster/b-8ac7cc01-5898-482d-be2f-a6b596050ea8 \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'

이러한 필터 기준을 기존 이벤트 소스 매핑에 추가하려면 다음 명령을 실행합니다.

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --filter-criteria '{"Filters": [{"Pattern": "{ \"value\" : [ { \"anything-but\": [ \"error\" ] } ] }"}]}'
AWS SAM

AWS SAM을 사용하여 이 필터를 추가하려면 이벤트 소스의 YAML 템플릿에 다음 코드 조각을 추가합니다.

FilterCriteria: Filters: - Pattern: '{ "value" : [ { "anything-but": [ "error" ] } ] }'

Amazon MSK 메시지는 UTF-8 인코딩 문자열이어야 하며, 일반 문자열이거나 JSON 형식이어야 합니다. 이는 Lambda가 필터 기준을 적용하기 전에 Amazon MSK 바이트 배열을 UTF-8로 디코딩하기 때문입니다. 메시지가 UTF-16 또는 ASCII와 같은 다른 인코딩을 사용하거나 메시지 형식이 FilterCriteria 형식과 일치하지 않는 경우 Lambda는 메타데이터 필터만 처리합니다. 다음 표에는 특정 동작이 요약되어 있습니다.

수신 메시지 형식 메시지 속성에 대한 필터 패턴 형식 결과적 작업

일반 문자열

일반 문자열

Lambda는 필터 기준에 따라 필터링합니다.

일반 문자열

데이터 속성에 대한 필터 패턴 없음

Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.

일반 문자열

유효한 JSON

Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.

유효한 JSON

일반 문자열

Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.

유효한 JSON

데이터 속성에 대한 필터 패턴 없음

Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.

유효한 JSON

유효한 JSON

Lambda는 필터 기준에 따라 필터링합니다.

UTF-8이 아닌 인코딩 문자열

JSON, 일반 문자열 또는 패턴 없음

Lambda는 필터 기준에 따라(다른 메타데이터 속성에만 해당) 필터링합니다.