KCL 1.x에서 KCL 2.x로 소비자 마이그레이션 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

KCL 1.x에서 KCL 2.x로 소비자 마이그레이션

이 주제에서는 Kinesis Client Library(KCL) 버전 1.x와 버전 2.x 간의 차이점을 설명합니다. 또한, 소비자를 KCL 버전 1.x에서 버전 2.x로 마이그레이션하는 방법을 알려 드립니다. 클라이언트 마이그레이션 후 마지막 체크포인트 위치에서 처리 기록을 시작합니다.

KCL 버전 2.0에서는 다음과 같은 인터페이스 변경이 이루어졌습니다.

KCL 인터페이스 변경 사항
KCL 1.x 인터페이스 KCL 2.0 인터페이스
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor software.amazon.kinesis.processor.ShardRecordProcessor
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory software.amazon.kinesis.processor.ShardRecordProcessorFactory
com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware software.amazon.kinesis.processor.ShardRecordProcessor에 포함됨

레코드 프로세서 마이그레이션

다음은 KCL 1.x에 구현된 레코드 프로세서를 보여주는 예제입니다.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput; public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { @Override public void initialize(InitializationInput initializationInput) { // // Setup record processor // } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { // // Process records, and possibly checkpoint // } @Override public void shutdown(ShutdownInput shutdownInput) { if (shutdownInput.getShutdownReason() == ShutdownReason.TERMINATE) { try { shutdownInput.getCheckpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { throw new RuntimeException(e); } } } @Override public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { try { checkpointer.checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow exception // e.printStackTrace(); } } }
레코드 프로세서 클래스를 마이그레이션하려면
  1. 다음과 같이 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorcom.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware의 인터페이스를 software.amazon.kinesis.processor.ShardRecordProcessor로 변경합니다.

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IShutdownNotificationAware; import software.amazon.kinesis.processor.ShardRecordProcessor; // public class TestRecordProcessor implements IRecordProcessor, IShutdownNotificationAware { public class TestRecordProcessor implements ShardRecordProcessor {
  2. importinitialize 메서드의 processRecords 문을 업데이트합니다.

    // import com.amazonaws.services.kinesis.clientlibrary.types.InitializationInput; import software.amazon.kinesis.lifecycle.events.InitializationInput; //import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput;
  3. shutdown 메서드를 새 메서드인 leaseLost, shardEnded, shutdownRequested으로 바꿉니다.

    // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shardEnded(...) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } // @Override // public void shutdownRequested(IRecordProcessorCheckpointer checkpointer) { // // // // This is moved to shutdownRequested(ShutdownReauestedInput) // // // try { // checkpointer.checkpoint(); // } catch (ShutdownException | InvalidStateException e) { // // // // Swallow exception // // // e.printStackTrace(); // } // } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } }

다음은 업데이트된 버전의 레코드 프로세서 클래스입니다.

package com.amazonaws.kcl; import software.amazon.kinesis.exceptions.InvalidStateException; import software.amazon.kinesis.exceptions.ShutdownException; import software.amazon.kinesis.lifecycle.events.InitializationInput; import software.amazon.kinesis.lifecycle.events.LeaseLostInput; import software.amazon.kinesis.lifecycle.events.ProcessRecordsInput; import software.amazon.kinesis.lifecycle.events.ShardEndedInput; import software.amazon.kinesis.lifecycle.events.ShutdownRequestedInput; import software.amazon.kinesis.processor.ShardRecordProcessor; public class TestRecordProcessor implements ShardRecordProcessor { @Override public void initialize(InitializationInput initializationInput) { } @Override public void processRecords(ProcessRecordsInput processRecordsInput) { } @Override public void leaseLost(LeaseLostInput leaseLostInput) { } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { try { shutdownRequestedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { // // Swallow the exception // e.printStackTrace(); } } }

레코드 프로세서 팩토리 마이그레이션

레코드 프로세스 팩토리는 리스가 필요할 경우 레코드 프로세서 생성을 담당합니다. 다음은 KCL 1.x 팩토리의 예입니다.

package com.amazonaws.kcl; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; public class TestRecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new TestRecordProcessor(); } }
레코드 프로세서 팩토리를 마이그레이션하려면
  1. 구현된 인터페이스를 다음과 같이 com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory에서 software.amazon.kinesis.processor.ShardRecordProcessorFactory로 변경합니다.

    // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessor; // import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; // public class TestRecordProcessorFactory implements IRecordProcessorFactory { public class TestRecordProcessorFactory implements ShardRecordProcessorFactory {
  2. createProcessor에 대한 반환 서명을 변경합니다.

    // public IRecordProcessor createProcessor() { public ShardRecordProcessor shardRecordProcessor() {

다음은 2.0의 레코드 프로세서 팩토리의 예입니다.

package com.amazonaws.kcl; import software.amazon.kinesis.processor.ShardRecordProcessor; import software.amazon.kinesis.processor.ShardRecordProcessorFactory; public class TestRecordProcessorFactory implements ShardRecordProcessorFactory { @Override public ShardRecordProcessor shardRecordProcessor() { return new TestRecordProcessor(); } }

작업자 마이그레이션

KCL 버전 2.0에서는 Scheduler라는 새로운 클래스가 Worker 클래스를 대체합니다. 다음은 KCL 1.x 워커의 예입니다.

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();
작업자를 마이그레이션하려면
  1. Worker 클래스에 대한 import 문을 SchedulerConfigsBuilder 클래스에 대한 가져오기 문으로 변경합니다.

    // import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import software.amazon.kinesis.coordinator.Scheduler; import software.amazon.kinesis.common.ConfigsBuilder;
  2. 다음 예와 같이 ConfigsBuilderScheduler를 생성합니다.

    KinesisClientUtil을 사용하여 KinesisAsyncClient를 만들고 KinesisAsyncClientmaxConcurrency를 구성하는 것이 좋습니다.

    중요

    Amazon Kinesis Client는 KinesisAsyncClient를 구성하여 KinesisAsyncClient의 전체 임대 수에 더해 추가 사용량까지 허용할 수 있을 만큼 maxConcurrency를 충분히 높이지 않을 경우 지연 시간이 크게 증가할 수 있습니다.

    import java.util.UUID; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient; import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.kinesis.common.ConfigsBuilder; import software.amazon.kinesis.common.KinesisClientUtil; import software.amazon.kinesis.coordinator.Scheduler; ... Region region = Region.AP_NORTHEAST_2; KinesisAsyncClient kinesisClient = KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region)); DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build(); CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build(); ConfigsBuilder configsBuilder = new ConfigsBuilder(streamName, applicationName, kinesisClient, dynamoClient, cloudWatchClient, UUID.randomUUID().toString(), new SampleRecordProcessorFactory()); Scheduler scheduler = new Scheduler( configsBuilder.checkpointConfig(), configsBuilder.coordinatorConfig(), configsBuilder.leaseManagementConfig(), configsBuilder.lifecycleConfig(), configsBuilder.metricsConfig(), configsBuilder.processorConfig(), configsBuilder.retrievalConfig() );

Amazon Kinesis Client 구성

Kinesis Client Library 2.0 릴리스에서는 클라이언트 구성이 단일 구성 클래스(KinesisClientLibConfiguration)에서 6개 구성 클래스로 이동했습니다. 다음 표에 마이그레이션에 대한 설명이 나와 있습니다.

구성 필드와 새 클래스
원래 필드 새 구성 클래스 설명
applicationName ConfigsBuilder 이 KCL 애플리케이션의 이름입니다. tableNameconsumerName의 기본값으로 사용됩니다.
tableName ConfigsBuilder Amazon DynamoDB 리스 테이블에 사용된 테이블 이름 재정의를 허용합니다.
streamName ConfigsBuilder 이 애플리케이션이 레코드를 처리하는 스트림의 이름입니다.
kinesisEndpoint ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
dynamoDBEndpoint ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
initialPositionInStreamExtended RetrievalConfig KCL의 레코드 가져오기가 시작되는 샤드의 위치입니다(애플리케이션의 초기 실행으로 시작).
kinesisCredentialsProvider ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
dynamoDBCredentialsProvider ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
cloudWatchCredentialsProvider ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
failoverTimeMillis LeaseManagementConfig 리스 소유자가 실패했다고 간주하기 전에 전달해야 하는 시간(밀리초)입니다.
workerIdentifier ConfigsBuilder 이 애플리케이션 프로세서 인스턴스화를 나타내는 고유 식별자입니다. 고유해야 합니다.
shardSyncIntervalMillis LeaseManagementConfig 샤드 sync 호출 사이의 시간.
maxRecords PollingConfig Kinesis가 반환하는 최대 레코드 수를 설정할 수 있습니다.
idleTimeBetweenReadsInMillis CoordinatorConfig 이 옵션은 제거되었습니다. 유휴 시간 제거를 참조하십시오
callProcessRecordsEvenForEmptyRecordList ProcessorConfig 설정하면, Kinesis에서 제공된 레코드가 없는 경우에도 레코드 프로세서가 직접적으로 호출됩니다.
parentShardPollIntervalMillis CoordinatorConfig 상위 샤드가 완료되었는지를 확인하기 위해 레코드 프로세서가 폴링을 수행해야 하는 간격입니다.
cleanupLeasesUponShardCompletion LeaseManagementConfig 설정하면, 하위 리스에서 처리를 시작하자마자 리스가 제거됩니다.
ignoreUnexpectedChildShards LeaseManagementConfig 설정하면, 진행 중인 샤드가 있는 하위 샤드가 무시됩니다. 이는 주로 DynamoDB Streams용입니다.
kinesisClientConfig ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
dynamoDBClientConfig ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
cloudWatchClientConfig ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
taskBackoffTimeMillis LifecycleConfig 실패한 작업을 재시도하기 위한 대기 시간.
metricsBufferTimeMillis MetricsConfig CloudWatch 측정치 게시를 제어합니다.
metricsMaxQueueSize MetricsConfig CloudWatch 측정치 게시를 제어합니다.
metricsLevel MetricsConfig CloudWatch 측정치 게시를 제어합니다.
metricsEnabledDimensions MetricsConfig CloudWatch 측정치 게시를 제어합니다.
validateSequenceNumberBeforeCheckpointing CheckpointConfig 이 옵션은 제거되었습니다. 체크포인트 시퀀스 번호 검증을 참조하십시오.
regionName ConfigsBuilder 이 옵션은 제거되었습니다. 클라이언트 구성 제거를 참조하십시오.
maxLeasesForWorker LeaseManagementConfig 애플리케이션의 한 인스턴스가 허용해야 하는 최대 리스 수입니다.
maxLeasesToStealAtOneTime LeaseManagementConfig 애플리케이션이 한 번에 스틸을 시도해야 하는 최대 리스 수입니다.
initialLeaseTableReadCapacity LeaseManagementConfig Kinesis Client Library에서 DynamoDB 리스 테이블을 새로 생성해야 하는 경우 사용되는 DynamoDB 읽기 IOP입니다.
initialLeaseTableWriteCapacity LeaseManagementConfig Kinesis Client Library에서 DynamoDB 리스 테이블을 새로 생성해야 하는 경우 사용되는 DynamoDB 읽기 IOP입니다.
initialPositionInStreamExtended LeaseManagementConfig 애플리케이션이 읽기를 시작해야 하는 스트림 내 초기 위치입니다. 최초 리스 생성 시에만 사용됩니다.
skipShardSyncAtWorkerInitializationIfLeasesExist CoordinatorConfig 리스 테이블에 기존 리스가 있는 경우 샤드 데이터 동기화를 비활성화합니다. TODO: KinesisEco-438
shardPrioritization CoordinatorConfig 사용할 샤드 우선 순위.
shutdownGraceMillis 해당 사항 없음 이 옵션은 제거되었습니다. MultiLang 제거를 참조하십시오.
timeoutInSeconds 해당 사항 없음 이 옵션은 제거되었습니다. MultiLang 제거를 참조하십시오.
retryGetRecordsInSeconds PollingConfig 실패에 대한 GetRecords 시도 사이의 지연을 구성합니다.
maxGetRecordsThreadPool PollingConfig GetRecords에 사용되는 스레드 풀 크기.
maxLeaseRenewalThreads LeaseManagementConfig 리스 갱신 스레드 풀의 크기를 제어합니다. 애플리케이션에서 사용할 수 있는 리스가 많을수록 이 풀의 크기가 커야 합니다.
recordsFetcherFactory PollingConfig 스트림에서 검색하는 페처를 생성하는 데 사용되는 팩토리를 교체할 수 있습니다.
logWarningForTaskAfterMillis LifecycleConfig 작업이 완료되지 않은 경우 얼마나 대기한 후 경고가 기록될지를 지정합니다.
listShardsBackoffTimeInMillis RetrievalConfig 실패 발생 시 ListShards 호출 간에 대기하는 시간(밀리초)입니다.
maxListShardsRetryAttempts RetrievalConfig 포기하기 전에 ListShards가 재시도하는 최대 횟수입니다.

유휴 시간 제거

KCL 1.x 버전에서 idleTimeBetweenReadsInMillis는 다음 두 가지 수량에 해당합니다.

  • 작업 디스패칭 확인 간의 시간. 작업 간의 이 시간은 이제 CoordinatorConfig#shardConsumerDispatchPollIntervalMillis를 설정하여 구성할 수 있습니다.

  • Kinesis Data Streams에서 반환하는 레코드가 없는 경우 절전 모드로 들어가는 시간. 버전 2.0에서는 향상된 팬아웃 레코드가 해당 검색자로부터 푸시됩니다. 샤드 소비자에 대한 작업은 푸시된 요청이 도착한 경우에만 발생합니다.

클라이언트 구성 제거

버전 2.0에서 KCL은 더 이상 클라이언트를 생성하지 않습니다. 이제 사용자가 유효한 클라이언트를 제공해야 합니다. 따라서 클라이언트 생성을 제어하던 모든 구성 파라미터는 삭제되었습니다. 이러한 파라미터가 필요한 경우 ConfigsBuilder에 클라이언트를 제공하기 전에 클라이언트에서 설정할 수 있습니다.

제거된 필드 동일 구성
kinesisEndpoint SDK KinesisAsyncClient를 원하는 엔드포인트 KinesisAsyncClient.builder().endpointOverride(URI.create("https://<kinesis endpoint>")).build()로 구성합니다.
dynamoDBEndpoint SDK DynamoDbAsyncClient를 원하는 엔드포인트 DynamoDbAsyncClient.builder().endpointOverride(URI.create("https://<dynamodb endpoint>")).build()로 구성합니다.
kinesisClientConfig SDK KinesisAsyncClient를 필요한 구성 KinesisAsyncClient.builder().overrideConfiguration(<your configuration>).build()로 구성합니다.
dynamoDBClientConfig SDK DynamoDbAsyncClient를 필요한 구성 DynamoDbAsyncClient.builder().overrideConfiguration(<your configuration>).build()로 구성합니다.
cloudWatchClientConfig SDK CloudWatchAsyncClient를 필요한 구성 CloudWatchAsyncClient.builder().overrideConfiguration(<your configuration>).build()로 구성합니다.
regionName SDK를 원하는 리전으로 구성합니다. 모든 SDK 클라이언트에 대해 동일합니다. 예: KinesisAsyncClient.builder().region(Region.US_WEST_2).build().