소비자 구현 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

소비자 구현

이 자습서의 소비자 애플리케이션은 데이터 스트림에서 주식 거래를 지속적으로 처리합니다. 그런 다음 1분마다 매매된 가장 인기 있는 주식들을 출력합니다. 이 애플리케이션은 Kinesis Client Library () 를 기반으로 구축되었으며, Kinesis Client Library (KCL) 는 소비자 앱에서 흔히 볼 수 있는 어려운 작업을 대부분 수행합니다. 자세한 내용은 Kinesis 클라이언트 라이브러리 사용 단원을 참조하십시오.

소스 코드를 참조하여 다음 정보를 검토하십시오.

StockTradesProcessor 클래스

귀하에게 제공되는 소비자의 주요 클래스로, 다음과 같은 작업을 수행합니다.

  • 인수로 전달된 애플리케이션, 데이터 스트림 및 지역 이름을 읽습니다.

  • 지역 이름을 사용하여 KinesisAsyncClient 인스턴스를 생성합니다.

  • StockTradeRecordProcessorFactory 인스턴스에 의해 구현된 ShardRecordProcessor의 서버 인스턴스를 제공하는 StockTradeRecordProcessor 인스턴스를 생성합니다.

  • KinesisAsyncClient, StreamNameApplicationName, 및 ConfigsBuilder 인스턴스를 사용하여 StockTradeRecordProcessorFactory 인스턴스를 만듭니다. 이 기능은 기본값을 사용하여 모든 구성을 생성하는 경우에 유용합니다.

  • 인스턴스와 함께 KCL 스케줄러 (이전 KCL 버전 1.x에서는 KCL 작업자로 알려짐) 를 생성합니다. ConfigsBuilder

  • 스케줄러는 각 샤드(이 소비자 인스턴스에 할당된 샤드)에 대해 새 스레드를 생성합니다. 이 스레드는 데이터 스트림에서 계속 반복적으로 레코드를 읽습니다. 그런 다음 StockTradeRecordProcessor 인스턴스를 호출하여 수신한 각 일괄 레코드를 처리합니다.

StockTradeRecordProcessor 클래스

StockTradeRecordProcessor 인스턴스의 구현입니다. 이 클래스는 다시 initialize, processRecords, leaseLost, shardEndedshutdownRequested라는 다섯 가지 필수 메서드를 구현합니다.

initializeshutdownRequested 메서드는 레코드 프로세서가 레코드 수신을 시작할 준비가 된 시기와 레코드 수신을 중지해야 하는 시기를 각각 알려주는 데 사용되므로 애플리케이션별 설정 및 종료 작업을 수행할 수 있습니다. KCL leaseLost임대가 손실되거나 처리가 샤드 종료 시점에 이르렀을 때 취해야 할 조치에 대한 모든 논리를 구현하는 데 사용됩니다. shardEnded 이 예에서는 이러한 이벤트를 나타내는 메시지만 기록합니다.

이러한 메서드에 대한 코드가 제공됩니다. processRecords 메서드에서 기본 처리가 발생하며, 각 레코드에 대해 processRecord를 사용합니다. 이 후자의 메서드는 다음 단계에서 구현할 수 있도록 대부분 비어 있는 스켈레톤 코드로 사용자에게 제공됩니다. 이 코드에 대해서는 다음 단계에서 더 자세히 설명합니다.

또한 processRecord: reportStatsresetStats에 대한 지원 메서드의 구현도 중요합니다. 이러한 메서드는 원래 소스 코드에서 비어 있습니다.

processRecords 메서드가 구현되며 다음 단계를 수행합니다.

  • 전달된 각 레코드에 대해 processRecord를 호출합니다.

  • 마지막 보고 이후 1분 이상이 경과된 경우 최신 통계를 인쇄하는 reportStats()를 호출한 후 통계를 지우는 resetStats()를 호출하여 다음 간격에 새 레코드만 포함되도록 합니다.

  • 다음 보고 시간을 설정합니다.

  • 마지막 체크포인트 이후 1분 이상이 경과된 경우 checkpoint()를 호출합니다.

  • 다음 검사 시간을 설정합니다.

이 메서드는 보고 및 검사 속도에 대해 60초 간격을 사용합니다. 체크포인트 수행에 대한 자세한 내용은 Using the Kinesis Client Library를 참조하세요.

StockStats 클래스

이 클래스는 시간에 따른 가장 인기 있는 주식에 대한 통계 추적 및 데이터 보존을 제공합니다. 다음 메서드가 포함된 이 코드가 제공됩니다.

  • addStockTrade(StockTrade): 지정된 StockTrade를 실행 중인 통계에 주입합니다.

  • toString(): 형식이 지정된 문자열로 통계를 반환합니다.

이 클래스는 각 주식의 총 거래 수와 최대 거래 수를 누적하여 가장 인기 있는 주식을 추적합니다. 그리고 주식 거래가 발생할 때마다 이러한 계수가 업데이트됩니다.

다음 단계에 표시된 대로 StockTradeRecordProcessor 클래스의 메서드에 코드를 추가합니다.

소비자를 구현하려면
  1. 정확한 크기의 processRecord 객체를 인스턴스화하고, 해당 객체에 레코드 데이터를 추가하고, 문제가 있는 경우 경고를 기록하여 StockTrade 메서드를 구현합니다.

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. reportStats메서드를 구현하세요. 원하는 대로 출력 형식을 수정하십시오.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. resetStats 메서드를 구현합니다. 이 메서드는 새 stockStats 인스턴스를 생성합니다.

    stockStats = new StockStats();
  4. ShardRecordProcessor인터페이스에 필요한 다음 메서드를 구현하십시오.

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
소비자를 실행하려면
  1. 프로듀서 구현에서 작성한 생산자를 실행하여 시뮬레이션된 주식 거래 레코드를 스트림에 첨가합니다.

  2. 이전에 (IAM사용자를 생성할 때) 검색한 액세스 키와 비밀 키 쌍이 파일에 ~/.aws/credentials 저장되어 있는지 확인합니다.

  3. 다음과 같은 인수를 사용하여 StockTradesProcessor 클래스를 실행합니다.

    StockTradesProcessor StockTradeStream us-west-2

    us-west-2 이외의 리전에 스트림을 생성한 경우 여기에 해당 리전을 대신 지정해야 합니다.

1분 후 다음과 같은 출력이 표시되어야 하며, 그 이후로 매분마다 새로 고침됩니다.

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

다음 단계

(선택 사항) 소비자 확장