5단계: 소비자 구현 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

5단계: 소비자 구현

이 자습서의 소비자 애플리케이션은 데이터 스트림에서 주식 거래를 지속적으로 처리합니다. 그런 다음 1분마다 매매된 가장 인기 있는 주식들을 출력합니다. Kinesis Client Library (KCL) 를 기반으로 하는 애플리케이션은 소비자 앱에 공통적인 과중한 업무를 많이 수행합니다. 자세한 정보는 Kinesis 클라이언트 라이브러리 사용을 참조하십시오.

소스 코드를 참조하여 다음 정보를 검토하십시오.

StockTrades프로세서 클래스 클래스

다음 작업을 수행하는 소비자의 기본 클래스가 제공됩니다.

  • 인수로 전달된 애플리케이션, 데이터 스트림 및 리전 이름을 읽습니다.

  • 리전 이름을 사용하여 KinesisAsyncClient 인스턴스를 생성합니다.

  • StockTradeRecordProcessorFactory 인스턴스에 의해 구현된 ShardRecordProcessor의 서버 인스턴스를 제공하는 StockTradeRecordProcessor 인스턴스를 생성합니다.

  • KinesisAsyncClient, StreamName, ApplicationNameStockTradeRecordProcessorFactory 인스턴스를 사용하여 ConfigsBuilder 인스턴스를 생성합니다. 이 기능은 기본값을 사용하여 모든 구성을 생성하는 경우에 유용합니다.

  • ConfigsBuilder 인스턴스를 사용하여 KCL 스케줄러(이전에 KCL 버전 1.x에서는 KCL 작업자라고 함)를 생성합니다.

  • 스케줄러는 각 샤드(이 소비자 인스턴스에 할당된 샤드)에 대해 새 스레드를 생성합니다. 이 스레드는 데이터 스트림에서 계속 반복적으로 레코드를 읽습니다. 그런 다음 StockTradeRecordProcessor 인스턴스를 호출하여 수신한 각 일괄 레코드를 처리합니다.

StockTradeRecordProcessor수업

StockTradeRecordProcessor 인스턴스의 구현입니다. 이 클래스는 다시 initialize, processRecords, leaseLost, shardEndedshutdownRequested라는 다섯 가지 필수 메서드를 구현합니다.

initializeshutdownRequested 메서드는 KCL에서 레코드 수신을 시작할 준비가 될 때 및 레코드 수신을 중지해야 할 때 애플리케이션별 설정 및 종료 작업을 수행할 수 있도록 레코드 프로세서에 알리기 위해 사용됩니다. leaseLostshardEnded는 리스가 손실되거나 처리가 샤드의 끝에 도달할 때 수행할 작업에 대한 로직을 구현하는 데 사용됩니다. 이 예에서는 이러한 이벤트를 나타내는 메시지만 기록합니다.

이러한 메서드에 대한 코드가 제공됩니다. processRecords 메서드에서 기본 처리가 발생하며, 각 레코드에 대해 processRecord를 사용합니다. 이 후자의 메서드는 다음 단계에서 구현할 수 있도록 대부분 비어 있는 스켈레톤 코드로 사용자에게 제공됩니다. 이 코드에 대해서는 다음 단계에서 더 자세히 설명합니다.

또한 processRecord: reportStatsresetStats에 대한 지원 메서드의 구현도 중요합니다. 이러한 메서드는 원래 소스 코드에서 비어 있습니다.

processRecords 메서드가 구현되며 다음 단계를 수행합니다.

  • 전달된 각 레코드에 대해 processRecord를 호출합니다.

  • 마지막 보고 이후 1분 이상이 경과된 경우 최신 통계를 인쇄하는 reportStats()를 호출한 후 통계를 지우는 resetStats()를 호출하여 다음 간격에 새 레코드만 포함되도록 합니다.

  • 다음 보고 시간을 설정합니다.

  • 마지막 체크포인트 이후 1분 이상이 경과된 경우 checkpoint()를 호출합니다.

  • 다음 검사 시간을 설정합니다.

이 메서드는 보고 및 검사 속도에 대해 60초 간격을 사용합니다. 체크포인트 설정에 대한 자세한 내용은 단원을 참조하십시오.Kinesis 클라이언트 라이브러리 사용.

StockStats수업

이 클래스는 시간에 따른 가장 인기 있는 주식에 대한 통계 추적 및 데이터 보존을 제공합니다. 다음 메서드가 포함된 이 코드가 제공됩니다.

  • addStockTrade(StockTrade): 지정된 StockTrade를 실행 중인 통계에 주입합니다.

  • toString(): 형식이 지정된 문자열로 통계를 반환합니다.

이 클래스는 각 주식에 대한 총 거래 수의 실행 개수와 최대 개수를 유지하여 가장 인기 있는 주식을 추적합니다. 그리고 주식 거래가 발생할 때마다 이러한 계수가 업데이트됩니다.

다음 단계에 표시된 대로 StockTradeRecordProcessor 클래스의 메서드에 코드를 추가합니다.

소비자를 구현하려면

  1. 정확한 크기의 processRecord 객체를 인스턴스화하고, 해당 객체에 레코드 데이터를 추가하고, 문제가 있는 경우 경고를 기록하여 StockTrade 메서드를 구현합니다.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. 간단한 reportStats 메서드를 구현합니다. 기본 설정에 적합하게 출력 형식을 자유롭게 수정합니다.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. resetStats 메서드를 구현합니다. 이 메서드는 새 stockStats 인스턴스를 생성합니다.

    stockStats = new StockStats();
  4. ShardRecordProcessor 인터페이스에 필요한 다음과 같은 메서드를 구현합니다.

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }

소비자를 실행하려면

  1. 4단계: 생산자 구현에서 작성한 생산자를 실행하여 시뮬레이션된 주식 거래 레코드를 스트림에 첨가합니다.

  2. 앞에서 (IAM 사용자를 생성할 때) 검색한 액세스 키 및 보안 키 페어가 파일 에 저장되었는지 확인합니다.~/.aws/credentials.

  3. 다음과 같은 인수를 사용하여 StockTradesProcessor 클래스를 실행합니다.

    StockTradesProcessor StockTradeStream us-west-2

    us-west-2 이외의 리전에서 스트림을 생성한 경우 해당 리전을 여기에 대신 지정해야 합니다.

1분 후 다음과 같은 출력이 표시되어야 하며, 그 이후로 매분마다 새로 고침됩니다.

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

다음 단계

6단계: (선택 사항) 소비자 확대