Node.js 에서 Kinesis 클라이언트 라이브러리 소비자 개발 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Node.js 에서 Kinesis 클라이언트 라이브러리 소비자 개발

Kinesis Client Library (KCL) 를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 빌드합니다. Kinesis Client Library 은 여러 언어로 제공됩니다. 이 주제에서는 Node.js에 대해 설명합니다.

KCL은 Java 라이브러리이며, Java 이외의 언어에 대한 지원은MultiLang데몬. 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 따라서 Node.js 용 KCL을 설치하고 Node.js 로만 소비자 앱을 작성한 경우에도 시스템에 Java를 설치해야 합니다.MultiLang데몬. 또한,MultiLang데몬에는 사용 사례에 맞게 사용자 지정해야 하는 몇 가지 기본 설정이 있습니다 (예:AWS연결되는 지역입니다. 에 대한 자세한 내용MultiLang데몬GitHub로 이동합니다.KCLMultiLang데몬 프로젝트페이지.

에서 Node.js KCL을 다운로드하려면GitHub로 이동합니다.Kinesis 클라이언트 라이브러리 (Node.js).

샘플 코드 다운로드

Node.js KCL에 두 가지 코드 샘플을 사용할 수 있습니다.

  • basic-sample

    Node.js KCL 소비자 애플리케이션을 빌드하기 위한 기초를 설명하기 위해 다음 단원에서 사용됩니다.

  • click-stream-sample

    기본 샘플 코드에 익숙해진 후에 실제 시나리오를 사용할 수 있는 약간 높은 수준의 코드입니다. 여기서는 이 샘플을 설명하지 않으며 자세한 내용은 README 파일에 있습니다.

Node.js KCL 소비자 애플리케이션을 구현할 때 다음 작업을 완료해야 합니다.

레코드 프로세서 구현

Node.js 용 KCL을 사용하는 가능한 한 가장 단순한 소비자는recordProcessor함수, 차례로 함수가 포함되어 있습니다.initialize,processRecords, 및shutdown. 이 샘플에서는 시작점으로 사용할 수 있는 구현을 제공합니다(sample_kcl_app.js 참조).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}

initialize

KCL은 다음을 호출합니다.initialize레코드 프로세서가 시작되면 작동합니다. 이 레코드 프로세서는 initializeInput.shardId로 전달된 샤드 ID만 처리하고 일반적으로 반대의 경우도 마찬가지입니다. 이 샤드는 해당 레코드 프로세서로만 처리됩니다. 하지만 소비자는 데이터 레코드가 두 번 이상 처리될 가능성을 고려해야 합니다. 이는 Kinesis Data Streams 스트림에최소의미론은 소비자의 작업자가 샤드의 모든 데이터 레코드를 적어도 한 번은 처리한다는 의미론입니다. 둘 이상의 작업자가 특정 샤드를 처리할 수 있는 경우에 대한 자세한 내용은 리샤딩, 크기 조정 및 병렬 처리를 참조하십시오.

initialize: function(initializeInput, completeCallback)

processRecords

KCL은 지정된 샤드의 데이터 레코드 목록을 포함하는 입력으로 이 함수를 호출합니다.initialize함수. 구현하는 레코드 프로세서가 소비자의 의미론에 따라 이 레코드의 데이터를 처리합니다. 예를 들어, 작업자가 데이터를 전환한 후 그 결과를 Amazon Simple Storage Service (Amazon S3) 버킷에 저장할 수 있습니다.

processRecords: function(processRecordsInput, completeCallback)

데이터 자체뿐 아니라 작업자가 데이터를 처리할 때 사용할 수 있는 시퀀스 번호와 파티션 키도 데이터 레코드에 포함됩니다. 예를 들어, 작업자는 파티션 키의 값을 기반으로 데이터를 저장할 S3 버킷을 선택할 수 있습니다. record 딕셔너리가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하기 위해 다음 키-값 페어를 노출합니다.

record.data record.sequenceNumber record.partitionKey

데이터가 Base64로 인코딩됩니다.

기본 샘플에서 processRecords 함수에는 작업자가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하는 방법을 보여주는 코드가 있습니다.

Kinesis Data Streams Streams의 경우 는 샤드에서 이미 처리된 레코드를 추적하도록 레코드 프로세서에 요구합니다. KCL은 다음을 사용하여 이 추적을 처리합니다.checkpointer다음과 같이 전달된 객체processRecordsInput.checkpointer. 레코드 프로세서가 다음을 호출합니다.checkpointer.checkpoint은 KCL에 샤드의 레코드 처리가 얼마나 진행되었는지를 KCL에 알려줍니다. 작업자가 실패할 경우 KCL은 샤드 처리를 다시 시작할 때 마지막으로 처리된 레코드에서 계속되도록 이 정보를 사용합니다.

분할 또는 병합 작업의 경우 원본 샤드의 프로세서가 를 호출할 때까지 KCL이 새 샤드의 처리를 시작하지 않습니다.checkpoint원본 샤드의 모든 처리가 완료되었음을 알립니다.

시퀀스 번호를 전달하지 않는 경우checkpoint함수, KCL은 에 대한 호출을 가정합니다.checkpoint은 레코드 프로세서에 전달된 마지막 레코드까지 모두 처리되었다는 의미로 간주합니다. 따라서 레코드 프로세서는 다음을 호출해야 합니다.checkpoint 목록에 전달된 모든 레코드를 처리한 후 레코드 프로세서는 checkpoint를 호출할 때마다 processRecords를 호출할 필요가 없습니다. 예를 들어, 프로세서는 세 번째 호출마다 checkpoint 또는 구현된 사용자 지정 검증/확인 서비스와 같은 레코드 프로세서 외부의 이벤트를 호출할 수 있습니다.

선택적으로 레코드의 정확한 시퀀스 번호를 checkpoint의 파라미터로 지정할 수도 있습니다. 이 경우 KCL은 모든 레코드가 해당 레코드까지만 처리되었다고 간주합니다.

기본 샘플 애플리케이션은 checkpointer.checkpoint 함수의 가장 단순하고 가능한 호출을 보여줍니다. 소비자에 필요한 다른 검사 로직을 함수의 이 지점에 추가할 수 있습니다.

shutdown

KCL은 다음을 호출합니다.shutdown처리가 끝날 때 함수 중 하나 (shutdownInput.reason입니다TERMINATE) 또는 작업자가 더 이상 응답하지 않습니다 (shutdownInput.reason입니다ZOMBIE).

shutdown: function(shutdownInput, completeCallback)

샤드 분할이나 병합 또는 스트림 삭제로 인해 레코드 프로세서가 샤드에서 추가 레코드를 수신하지 않으면 처리가 종료됩니다.

KCL은 또한 a를 통과합니다.shutdownInput.checkpointer객체가 있는shutdown. 종료 이유가 TERMINATE이면 레코드 프로세서가 모든 데이터 레코드 처리를 완료했는지 확인하고 이 인터페이스의 checkpoint 함수를 호출해야 합니다.

구성 속성 수정

이 샘플은 구성 속성의 기본값을 제공합니다. 이 속성을 사용자의 값으로 재정의할 수 있습니다(기본 샘플의 sample.properties 참조).

애플리케이션 이름

KCL에는 애플리케이션 및 같은 리전의 Amazon DynamoDB 테이블에서 고유한 애플리케이션이 필요합니다. 다음과 같이 애플리케이션 이름 구성 값이 사용됩니다.

  • 이 애플리케이션 이름과 관련된 모든 작업자는 동일한 스트림에서 함께 작업한다고 간주됩니다. 이 작업자는 여러 인스턴스에 분산되어 있을 수 있습니다. 동일한 애플리케이션 코드의 추가 인스턴스를 다른 애플리케이션 이름으로 실행하는 경우 KCL은 두 번째 인스턴스를 동일한 스트림에서 작동하는 완전히 별개의 애플리케이션으로 취급합니다.

  • KCL은 애플리케이션 이름이 있는 DynamoDB 테이블을 만들고 테이블을 사용하여 애플리케이션의 상태 정보 (예: 체크포인트 및 작업자와 샤드의 매핑) 를 보관합니다. 각각의 애플리케이션에는 자체 DynamoDB 테이블이 있습니다. 자세한 정보는 임대 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리된 샤드 추적을 참조하십시오.

자격 증명 설정

반드시 만들어야 합니다.AWS기본 자격 증명 공급자 체인의 자격 증명 공급자 중 하나에 대해 자격 증명을 사용할 수 있는 자격 증명 AWSCredentialsProvider 속성을 사용하여 자격 증명 공급자를 설정할 수 있습니다. sample.properties 파일에서 기본 자격 증명 공급자 체인의 자격 증명 공급자 중 하나에 자격 증명을 사용할 수 있도록 해야 합니다. Amazon EC2 인스턴스에서 소비자를 실행하는 경우 IAM 역할로 인스턴스를 구성하는 것이 좋습니다.AWS이 IAM 역할과 관련된 권한을 반영하는 권한을 반영하는 인스턴스 자격 증명은 인스턴스 메타테이터를 통해 인스턴스의 애플리케이션에서 사용할 수 있습니다. 이것이 EC2 인스턴스에서 실행되는 소비자 애플리케이션의 자격 증명을 관리하는 가장 안전한 방법입니다.

다음 예제는 Kinesis 데이터 스트림을 처리하도록 Kinesis 데이터 스트림을 구성합니다.kclnodejssample에서 제공된 레코드 프로세서 사용sample_kcl_app.js:

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON