Lambda 함수로 데이터 전처리 - SQL 애플리케이션용 Amazon Kinesis Data Analytics for SQL 애플리케이션 개발자 가이드

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Lambda 함수로 데이터 전처리

주의

새 프로젝트의 경우 SQL 애플리케이션용 Kinesis Data Analytics 분석보다 새로운 Kinesis Kinesis Data Analytics 스튜디오를 사용하는 것이 좋습니다. Kinesis Data Analytics Studio는 사용 편의성과 고급 분석 기능을 결합하여 정교한 스트림 처리 애플리케이션을 몇 분 만에 구축할 수 있도록 합니다.

스트림의 데이터에 대해 형식 변환, 전환, 강화 또는 필터링이 필요한 경우 AWS Lambda 함수를 사용하여 데이터를 사전 처리할 수 있습니다. 애플리케이션 SQL 코드가 실행되기 전에 또는 애플리케이션이 데이터 스트림에서 스키마를 생성하기 전에 이 작업을 수행할 수 있습니다.

레코드 전처리에 Lambda 함수를 사용하면 다음과 같은 시나리오에서 유용합니다.

  • 다른 형식 (예: KPL 또는 GZIP) 의 레코드를 Kinesis 데이터 분석에서 분석할 수 있는 형식으로 변환합니다. Kinesis Data Analytics 현재 JSON 또는 CSV 데이터 형식을 지원합니다.

  • 집계, 변칙 검색 등과 같은 작업을 위해 액세스하기 쉬운 형식으로 데이터 확장. 예를 들어, 여러 데이터 값을 하나의 문자열에 함께 저장할 경우 데이터를 별도 열로 확장할 수 있습니다.

  • 다른 Amazon 서비스를 통한 데이터 강화 (예: 추정 또는 오류 수정).

  • 레코드 필드에 복잡한 문자열 변환 적용

  • 데이터 정리를 위해 데이터 필터링

레코드 전처리를 위한 Lambda 함수 사용

Kinesis Data Analytics 애플리케이션을 생성할 때 소스에 Connect 페이지에서 Lambda 사전 처리를 활성화합니다.

Lambda 함수를 사용하여 Kinesis Data Analytics 애플리케이션에서 레코드를 전처리하려면
  1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/kinesisanalytics에서 Kinesis Data Analytics 콘솔을 엽니다.

  2. 애플리케이션의 소스에 Connect 페이지의 레코드 사전 처리 포함AWS Lambda 섹션에서 활성화를 선택합니다.

  3. 이미 생성한 Lambda 함수를 사용하려면 Lambda 함수 드롭다운 목록에서 함수를 선택합니다.

  4. Lambda 전처리 템플릿 중 하나에서 새 Lambda 함수를 생성하려면 드롭다운 목록에서 템플릿을 선택합니다. 그런 다음 View <template name> in Lambda(Lambda에서 <템플릿 이름> 보기)를 선택하여 함수를 편집합니다.

  5. 새 Lambda 함수를 생성하려면 새로 생성을 선택합니다. Lambda 함수 생성에 대한 자세한 내용은 AWS Lambda개발자 안내서의 HelloWorld Lambda 함수 생성 및 콘솔 탐색을 참조하십시오.

  6. 사용할 Lambda 함수의 버전을 선택합니다. 최신 버전을 사용하려면 [$LATEST]를 선택합니다.

레코드 전처리를 위해 Lambda 함수를 선택하거나 생성하면 애플리케이션 SQL 코드가 실행되거나 애플리케이션이 레코드에서 스키마를 생성하기 전에 레코드가 사전 처리됩니다.

Lambda 사전 처리 권한

Lambda 전처리를 사용하려면 애플리케이션의 IAM 역할에 다음과 같은 권한 정책이 필요합니다.

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "<FunctionARN>" }

권한 정책 추가에 대한 자세한 내용은 Amazon Kinesis Data Analytics Analytics에 대한 인증 및 액세스 제어 단원을 참조하십시오.

Lambda 사전 처리 지표

CloudWatch Amazon을 사용하여 Lambda 호출 수, 처리된 바이트 수, 성공 및 실패 등을 모니터링할 수 있습니다. Kinesis Data Analytics Lambda 사전 처리를 통해 생성되는 CloudWatch 지표에 대한 자세한 내용은 Amazon Kinesis Analytics 지표를 참조하십시오.

Kinesis 프로듀서 라이브러리와AWS Lambda 함께 사용

Kinesis 프로듀서 라이브러리 (KPL) 는 사용자가 포맷한 작은 레코드를 최대 1MB의 대용량 레코드로 집계하여 Amazon Kinesis Data Streams 처리량을 더 잘 활용할 수 있도록 합니다. Java용 Kinesis Client Library (KCL) 에서 이러한 레코드 집계 해제를 지원합니다. 그러나 AWS Lambda을 스트림 소비자로 사용할 경우 특수 모듈을 사용하여 레코드를 분해해야 합니다.

필요한 프로젝트 코드 및 지침을 보려면 의 Kinesis 프로듀서 라이브러리 집계 해제 모듈을 참조하십시오AWS Lambda GitHub. 이 프로젝트의 구성 요소를 사용하여 Java, Node.js 및 Python으로 AWS Lambda에서 직렬화된 KPL 데이터를 처리할 수 있습니다. 다중 언어 KCL 애플리케이션의 일부로 이러한 구성 요소를 사용할 수 있습니다.

데이터 사전 처리 이벤트 입력 데이터 모델/레코드 응답 모델

레코드를 전처리하려면 Lambda 함수가 필수 이벤트 입력 데이터 및 레코드 응답 모델을 준수해야 합니다.

이벤트 입력 데이터 모델

Kinesis Data Analytics Kinesis Data Firehose 전송 스트림에서 데이터를 지속적으로 읽습니다. 검색한 각 레코드 배치에 대해 서비스는 각 배치가 Lambda 함수에 전달되는 방식을 관리합니다. 함수에서는 레코드 목록을 입력으로 수신합니다. 함수 내에서 목록을 반복하고 비즈니스 로직을 적용하여 사전 처리 요구 사항(예: 데이터 포맷 변환 또는 강화)을 충족합니다.

전처리 함수의 입력 모델은 Kinesis Data Firehose 전송 스트림이 데이터를 수신했는지 Kinesis Data Firehose 전송 스트림에서 데이터를 수신했는지에 따라 약간 다릅니다.

소스가 Kinesis Data Firehose 전송 스트림인 경우 이벤트 입력 데이터 모델은 다음과 같습니다.

Kinesis Data Firehose 요청 데이터 모델

필드 설명
invocationId Lambda 호출 ID (임의 GUID) 입니다.
applicationArn Kinesis Data Analytics 애플리케이션 Amazon 리소스 이름 (ARN)
streamArn 전송 스트림 ARN
레코드
필드 설명
recordId 레코드 ID(임의 GUID)
kinesisFirehoseRecordMetadata
필드 설명
approximateArrivalTimestamp 전송 스트림 레코드의 대략적인 도착 시간
data Base64 인코딩된 소스 레코드 페이로드

다음 예제는 Firehose 전송 스트림의 입력을 보여 줍니다.

{ "invocationId":"00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn":"arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn":"arn:aws:firehose:us-east-1:AAAAAAAAAAAA:deliverystream/lambda-test", "records":[ { "recordId":"49572672223665514422805246926656954630972486059535892482", "data":"aGVsbG8gd29ybGQ=", "kinesisFirehoseRecordMetadata":{ "approximateArrivalTimestamp":1520280173 } } ] }

소스가 Kinesis 데이터 스트림인 경우 이벤트 입력 데이터 모델은 다음과 같습니다.

Kinesis 스트림 요청 데이터 모델

필드 설명
invocationId Lambda 호출 ID (임의 GUID) 입니다.
applicationArn Kinesis Data Analytics 애플리케이션 ARN
streamArn 전송 스트림 ARN
레코드
필드 설명
recordId Kinesis 레코드 시퀀스 번호를 기반으로 하는 레코드 ID
kinesisStreamRecordMetadata
필드 설명
sequenceNumber Kinesis 스트림 레코드의 시퀀스 번호
partitionKey Kinesis 스트림 레코드의 파티션 키
shardId ShardId Kinesis 스트림 레코드의
approximateArrivalTimestamp 전송 스트림 레코드의 대략적인 도착 시간
data Base64 인코딩된 소스 레코드 페이로드

다음 예제는 Kinesis 데이터 스트림의 입력을 보여 줍니다.

{ "invocationId": "00540a87-5050-496a-84e4-e7d92bbaf5e2", "applicationArn": "arn:aws:kinesisanalytics:us-east-1:12345678911:application/lambda-test", "streamArn": "arn:aws:kinesis:us-east-1:AAAAAAAAAAAA:stream/lambda-test", "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "data": "aGVsbG8gd29ybGQ=", "kinesisStreamRecordMetadata":{ "shardId" :"shardId-000000000003", "partitionKey":"7400791606", "sequenceNumber":"49572672223665514422805246926656954630972486059535892482", "approximateArrivalTimestamp":1520280173 } } ] }

레코드 응답 모델

Lambda 함수에 전송되는 Lambda 사전 처리 함수(레코드 ID 포함)에서 반환된 모든 레코드가 반환되어야 합니다. 파라미터는 다음과 같은 파라미터를 포함해야 합니다. 그렇지 않으면 Kinesis Data Analytics Analytics에서 이를 거부하고 이를 데이터 전처리 실패로 간주합니다. 레코드의 데이터 페이로드 부분을 사전 처리 요구 사항에 맞게 변환할 수 있습니다.

응답 데이터 모델

레코드
필드 설명
recordId 레코드 ID는 호출 중에 Kinesis Data Analytics 분석에서 Lambda로 전달됩니다. 변환된 레코드에는 동일한 레코드 ID가 포함되어야 합니다. 원래 레코드의 ID와 변환된 레코드의 ID 간 불일치는 데이터 사전 처리 실패로 간주됩니다.
result 레코드의 데이터 변환 상태입니다. 가능한 값은 다음과 같습니다.
  • Ok: 레코드가 성공적으로 변환되었습니다. Kinesis Data Analytics SQL 처리를 위한 레코드를 수집합니다.

  • Dropped: 처리 로직에 의해 의도적으로 레코드가 삭제되었습니다. Kinesis Data Analytics SQL 처리에서 레코드를 삭제합니다. Dropped 레코드에 대해 데이터 페이로드 필드는 선택 사항입니다.

  • ProcessingFailed: 레코드를 변환할 수 없습니다. Kinesis Data Analytics Analytics에서는 Lambda 함수에서 이를 성공적으로 처리하지 못한 것으로 간주하고 오류 스트림에 오류를 기록합니다. 오류 스트림에 대한 자세한 내용은 오류 처리 단원을 참조하십시오. ProcessingFailed 레코드에 대해 데이터 페이로드 필드는 선택 사항입니다.

data base64 인코딩 후 변환된 데이터 페이로드입니다. 각 데이터 페이로드는 애플리케이션 수집 데이터 형식이 JSON인 경우 여러 JSON 문서를 포함할 수 있습니다. 또는 애플리케이션 수집 데이터 형식이 CSV인 경우 여러 CSV 행(각 행에서 지정된 행 구분 기호 사용)을 포함할 수 있습니다. Kinesis Data Analytics 서비스는 동일한 데이터 페이로드 내에서 여러 JSON 문서 또는 CSV 행으로 데이터를 성공적으로 파싱하고 처리합니다.

다음 예제는 Lambda 함수의 출력을 보여 줍니다.

{ "records": [ { "recordId": "49572672223665514422805246926656954630972486059535892482", "result": "Ok", "data": "SEVMTE8gV09STEQ=" } ] }

일반 데이터 사전 처리 기능

다음은 사전 처리에 실패할 수 있는 일반적인 이유입니다.

  • Lambda 함수로 전송된 일괄 처리의 모든 레코드 (레코드 ID 포함) 가 Kinesis Data Analytics 서비스로 다시 반환되는 것은 아닙니다.

  • 응답에 레코드 ID, 상태 또는 데이터 페이로드 필드가 없습니다. Dropped 또는 ProcessingFailed 레코드에 대해 데이터 페이로드 필드는 선택 사항입니다.

  • Lambda 함수 타임아웃은 데이터를 전처리하기에 충분하지 않습니다.

  • Lambda 함수 응답이AWS Lambda 서비스에서 부과한 응답 제한을 초과했습니다.

데이터 전처리 실패의 경우 Kinesis Data Analytics 성공할 때까지 동일한 레코드 세트에서 Lambda 호출을 계속 재시도합니다. 다음 CloudWatch 지표를 모니터링하여 장애에 대한 통찰력을 얻을 수 있습니다.

  • Kinesis Data Analytics 애플리케이션MillisBehindLatest: 애플리케이션이 스트리밍 소스에서 읽고 있는 지연 시간을 나타냅니다.

  • Kinesis Data Analytics 애플리케이션InputPreprocessing CloudWatch 지표: 다른 통계 중에서도 성공 및 실패 횟수를 나타냅니다. 자세한 내용은 Amazon Kinesis Analytics 지표를 참조하십시오.

  • AWS Lambda함수 CloudWatch 메트릭 및 로그.