Java로 Kinesis 클라이언트 라이브러리 소비자 개발 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Java로 Kinesis 클라이언트 라이브러리 소비자 개발

Kinesis 클라이언트 라이브러리 (KCL) 를 사용하여 Kinesis 데이터 스트림의 데이터를 처리하는 애플리케이션을 구축할 수 있습니다. Kinesis Client Library는 여러 언어로 제공됩니다. 이 주제에서는 Java에 대해 설명합니다. Javadoc 참조를 보려면 클래스에 대한 AWS Javadoc 항목을 참조하십시오. AmazonKinesisClient

KCL에서 GitHub Java를 다운로드하려면 Kinesis 클라이언트 라이브러리 (Java) 로 이동하십시오. Apache KCL Maven에서 Java를 찾으려면 KCL검색 결과 페이지로 이동하십시오. 에서 GitHub Java KCL 소비자 애플리케이션용 샘플 코드를 다운로드하려면 의 KCLJava 샘플 프로젝트 페이지로 이동하십시오. GitHub

샘플 애플리케이션에 Apache Commons Logging이 사용됩니다. configure 파일에 정의된 정적 AmazonKinesisApplicationSample.java 메서드에서 로깅 구성을 변경할 수 있습니다. Log4j 및 AWS Java 응용 프로그램에서 Apache Commons Logging을 사용하는 방법에 대한 자세한 내용은 개발자 안내서의 Log4j를 사용한 로깅을 참조하십시오.AWS SDK for Java

Java로 소비자 애플리케이션을 구현할 때는 다음 작업을 완료해야 합니다. KCL

IRecordProcessor메서드를 구현하십시오.

는 KCL 현재 두 가지 버전의 IRecordProcessor 인터페이스를 지원합니다. 첫 번째 버전에서는 원래 인터페이스를 사용할 수 있고 버전 2는 버전 1.5.0부터 사용할 수 있습니다. KCL KCL 두 인터페이스 모두 완벽하게 지원되며, 특정 시나리오 요구 사항에 따라 선택할 수 있습니다. 모든 차이점을 보려면 로컬로 빌드된 Javadoc 또는 소스 코드를 참조하십시오. 다음 단원에서는 시작에 필요한 최소한의 구현을 설명합니다.

원래의 인터페이스(버전 1)

원래의 IRecordProcessor 인터페이스(package com.amazonaws.services.kinesis.clientlibrary.interfaces)는 소비자가 구현할 다음과 같은 레코드 프로세서 메서드를 노출합니다. 이 샘플에서는 시작점으로 사용할 수 있는 구현을 제공합니다(AmazonKinesisApplicationSampleRecordProcessor.java 참조).

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
초기화

레코드 프로세서가 인스턴스화될 때 initialize 메서드를 KCL 호출하여 특정 샤드 ID를 매개 변수로 전달합니다. 이 레코드 프로세서는 해당 샤드만 처리하고 일반적으로 반대의 경우도 마찬가지입니다. 이 샤드는 해당 레코드 프로세서로만 처리됩니다. 하지만 소비자는 데이터 레코드가 두 번 이상 처리될 가능성을 고려해야 합니다. Kinesis Data Streams에서는 소비자의 워커가 샤드의 모든 데이터 레코드를 적어도 한 번은 처리한다는 적어도 한 번 의미론이 통용됩니다. 둘 이상의 작업자가 특정 샤드를 처리할 수 있는 경우에 대한 자세한 내용은 리샤딩, 스케일링 및 병렬 처리를 사용하여 샤드 수를 변경합니다.를 참조하십시오.

public void initialize(String shardId)
processRecords

processRecords메서드를 KCL 호출하여 메서드에서 지정한 샤드의 데이터 레코드 목록을 전달합니다. initialize(shardId) 레코드 프로세서는 소비자의 의미론에 따라 이 레코드의 데이터를 처리합니다. 예를 들어, 워커가 데이터를 전환한 후 그 결과를 Amazon Simple Storage Service(S3) 버킷에 저장할 수 있습니다.

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

데이터 자체뿐 아니라 시퀀스 번호와 파티션 키도 데이터 레코드에 포함됩니다. 작업자가 데이터를 처리할 때 이 값을 사용할 수 있습니다. 예를 들어, 작업자는 파티션 키의 값을 기반으로 데이터를 저장할 S3 버킷을 선택할 수 있습니다. Record 클래스는 레코드의 데이터, 시퀀스 번호 및 파티션 키에 대한 액세스를 제공하는 다음 메서드를 노출합니다.

record.getData() record.getSequenceNumber() record.getPartitionKey()

이 샘플의 프라이빗 메서드 processRecordsWithRetries에는 작업자가 레코드의 데이터, 시퀀스 번호 및 파티션 키에 액세스하는 방법을 보여주는 코드가 있습니다.

Kinesis Data Streams는 샤드에서 이미 처리된 레코드를 추적하도록 레코드 프로세서에 요구합니다. 는 checkpointer (IRecordProcessorCheckpointer) 를 에 전달하여 이 추적을 자동으로 KCL 처리합니다. processRecords 레코드 프로세서는 이 인터페이스에서 checkpoint 메서드를 호출하여 샤드의 레코드 처리 진행 상태를 알립니다. KCL 작업자가 실패하면 KCL 는 이 정보를 사용하여 마지막으로 처리된 것으로 알려진 레코드에서 샤드 처리를 다시 시작합니다.

분할 또는 병합 작업의 경우 원본 샤드의 프로세서가 checkpoint 호출하여 원본 샤드의 모든 처리가 완료되었다는 신호를 보내기 전까지는 새 샤드 처리가 KCL 시작되지 않습니다.

매개 변수를 전달하지 않는 경우 는 를 호출하면 레코드 프로세서에 전달된 마지막 레코드까지 모든 레코드가 처리되었음을 KCL checkpoint 의미한다고 가정합니다. 따라서 레코드 프로세서는 전달된 목록에 있는 모든 레코드를 반드시 처리한 후에 checkpoint를 호출해야 합니다. 레코드 프로세서는 checkpoint를 호출할 때마다 processRecords를 호출할 필요가 없습니다. 예를 들어, 프로세서는 checkpoint를 세 번째 호출할 때마다 processRecords를 호출할 수 있습니다. 선택적으로 레코드의 정확한 시퀀스 번호를 checkpoint의 파라미터로 지정할 수도 있습니다. 이 경우 는 모든 레코드가 해당 레코드까지만 처리되었다고 KCL 가정합니다.

이 샘플에서는 프라이빗 메서드 checkpoint가 적절한 예외 처리 및 재시도 로직을 사용하여 IRecordProcessorCheckpointer.checkpoint를 호출하는 방법을 보여줍니다.

KCL는 데이터 레코드 processRecords 처리로 인해 발생하는 모든 예외를 처리합니다. 에서 processRecords 예외가 발생하면 예외 이전에 전달된 데이터 레코드를 KCL 건너뛰게 됩니다. 이러한 레코드는 예외가 발생한 프로세서 또는 소비자의 다른 레코드 프로세서로 다시 전송되지 않습니다.

shutdown

처리가 종료되거나 (종료 이유는) 작업자가 더 이상 응답하지 않을 때 (종료 이유는TERMINATE) shutdown 메서드를 KCL 호출합니다. ZOMBIE

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

샤드 분할이나 병합 또는 스트림 삭제로 인해 레코드 프로세서가 샤드에서 추가 레코드를 수신하지 않으면 처리가 종료됩니다.

또한 KCL IRecordProcessorCheckpointer 인터페이스도 전달합니다. shutdown 종료 이유가 TERMINATE이면 레코드 프로세서가 데이터 레코드 처리를 완료하고 이 인터페이스의 checkpoint 메서드를 호출해야 합니다.

업데이트된 인터페이스 (버전 2)

업데이트된 IRecordProcessor 인터페이스(package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2)는 소비자가 구현할 다음과 같은 레코드 프로세서 메서드를 노출합니다.

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

원래 인터페이스 버전의 모든 인수는 컨테이너 객체에서 get 메서드를 통해 액세스할 수 있습니다. 예를 들어, processRecords()를 사용하여 processRecordsInput.getRecords()의 레코드 목록을 검색할 수 있습니다.

이 인터페이스 버전 2 (KCL1.5.0 이상) 부터 원래 인터페이스에서 제공한 입력 외에도 다음과 같은 새 입력을 사용할 수 있습니다.

시작 시퀀스 번호

InitializationInput 작업에 전달된 initialize() 객체에서 레코드가 레코드 프로세서 인스턴스에 제공될 시작 시퀀스 번호이며, 전에 같은 샤드를 처리하는 레코드 프로세서가 마지막으로 검사한 시퀀스 번호입니다. 애플리케이션에 이 정보가 필요할 경우 제공됩니다.

보류 중인 체크포인트 시퀀스 번호

initialize() 작업에 전달된 InitializationInput 객체에서 이전 레코드 프로세서 인스턴스가 중단되기 전에 커밋되지 못한 보류 중인 체크포인트 시퀀스 번호(있는 경우)입니다.

인터페이스의 클래스 팩토리를 구현하세요. IRecordProcessor

레코드 프로세서 메서드를 구현하는 클래스 팩토리도 구현해야 합니다. 소비자가 작업자를 인스턴스화할 때 이 팩토리에 참조를 전달합니다.

이 샘플은 원래의 레코드 프로세서 인터페이스를 사용하여 AmazonKinesisApplicationSampleRecordProcessorFactory.java 파일에서 팩토리 클래스를 구현합니다. 클래스 팩토리에서 레코드 프로세서 버전 2를 만들려면 패키지 이름 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2를 사용하십시오.

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

워커 생성

에서 IRecordProcessor메서드를 구현하십시오. 설명한 것처럼 KCL 레코드 프로세서 인터페이스에는 두 가지 버전 중에서 선택할 수 있으며, 이에 따라 작업자 생성 방법이 달라집니다. 원래의 레코드 프로세서 인터페이스는 다음 코드 구조를 사용하여 작업자를 생성합니다.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

레코드 프로세서 인터페이스 버전 2를 통해 인수의 순서와 사용할 생성자를 고민할 필요 없이 Worker.Builder를 사용하여 작업자를 만들 수 있습니다. 업데이트된 레코드 프로세서 인터페이스는 다음 코드 구조를 사용하여 작업자를 생성합니다.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

구성 속성을 수정하십시오.

이 샘플은 구성 속성의 기본값을 제공합니다. 그러면 작업자의 이 구성 데이터가 KinesisClientLibConfiguration 객체에 통합됩니다. IRecordProcessor의 클래스 팩토리에 대한 참조와 이 객체는 작업자를 인스턴스화하는 호출에서 전달됩니다. Java 속성 파일을 사용하여 이 속성을 사용자의 값으로 재정의할 수 있습니다(AmazonKinesisApplicationSample.java 참조).

애플리케이션 이름

애플리케이션과 동일한 지역의 Amazon DynamoDB 테이블 간에 고유한 애플리케이션 이름이 KCL 필요합니다. 다음과 같이 애플리케이션 이름 구성 값이 사용됩니다.

  • 이 애플리케이션 이름과 관련된 모든 작업자는 동일한 스트림에서 함께 작업한다고 간주됩니다. 이 작업자는 여러 인스턴스에 분산되어 있을 수 있습니다. 애플리케이션 코드는 동일하지만 애플리케이션 이름이 다른 추가 인스턴스를 실행하는 경우 에서는 두 번째 인스턴스를 동일한 스트림에서 작동하는 완전히 별개의 애플리케이션으로 KCL 취급합니다.

  • 는 애플리케이션 이름을 사용하여 DynamoDB 테이블을 KCL 생성하고 이 테이블을 사용하여 애플리케이션의 상태 정보 (예: 체크포인트, 워커-샤드 매핑) 를 유지 관리합니다. 각각의 애플리케이션에는 자체 DynamoDB 테이블이 있습니다. 자세한 내용은 임대 테이블을 사용하여 소비자 애플리케이션에서 처리한 샤드를 추적할 수 있습니다. KCL 단원을 참조하십시오.

보안 인증 설정

기본 자격 증명 공급자 체인의 AWS 자격 증명 공급자 중 한 명이 자격 증명을 사용할 수 있도록 해야 합니다. 예를 들어, 소비자를 EC2 인스턴스에서 실행하는 경우 역할을 사용하여 인스턴스를 IAM 시작하는 것이 좋습니다. AWS 이 IAM 역할과 관련된 권한을 반영하는 자격 증명은 인스턴스 메타데이터를 통해 인스턴스의 애플리케이션에 제공됩니다. 이는 EC2 인스턴스에서 실행되는 소비자의 자격 증명을 관리하는 가장 안전한 방법입니다.

샘플 애플리케이션은 먼저 인스턴스 메타데이터에서 IAM 자격 증명을 검색하려고 시도합니다.

credentialsProvider = new InstanceProfileCredentialsProvider();

샘플 애플리케이션이 인스턴스 메타데이터에서 자격 증명을 가져오지 못하면 속성 파일에서 자격 증명 검색을 시도합니다.

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

인스턴스 메타데이터에 대한 자세한 내용은 Amazon EC2 사용 설명서의 인스턴스 메타데이터를 참조하십시오.

작업자 ID를 여러 인스턴스에 사용하십시오.

샘플 초기화 코드는 로컬 컴퓨터 이름을 사용하고 다음 코드 조각과 같이 전역적으로 고유한 식별자를 추가하여 작업자의 ID인 workerId를 만듭니다. 이 방법은 단일 컴퓨터에서 소비자 애플리케이션의 여러 인스턴스가 실행되는 시나리오를 지원합니다.

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

레코드 프로세서 인터페이스 버전 2로 마이그레이션

위에서 설명한 단계 외에도 원래의 인터페이스를 사용하는 코드를 마이그레이션하려면 다음 단계가 필요합니다.

  1. 버전 2 레코드 프로세서 인터페이스를 가져오도록 레코드 프로세서 클래스를 변경합니다.

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. 컨테이너 객체에서 get 메서드를 사용하도록 참조를 변경합니다. 예를 들어, shutdown() 작업에서 "checkpointer"를 "shutdownInput.getCheckpointer()"로 변경합니다.

  3. 버전 2 레코드 프로세서 팩토리 인터페이스를 가져오도록 레코드 프로세서 팩토리 클래스를 변경합니다.

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. Worker.Builder를 사용하도록 작업자의 구성을 변경합니다. 예:

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();