기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
5단계: 소비자 구현
이 자습서의 소비자 애플리케이션은 데이터 스트림에서 주식 거래를 지속적으로 처리합니다. 그런 다음 1분마다 매매된 가장 인기 있는 주식들을 출력합니다. Kinesis Client Library (KCL) 를 기반으로 하는 애플리케이션은 소비자 앱에 공통적인 과중한 업무를 많이 수행합니다. 자세한 정보는 Kinesis 클라이언트 라이브러리 사용을 참조하십시오.
소스 코드를 참조하여 다음 정보를 검토하십시오.
- StockTrades프로세서 클래스 클래스
-
다음 작업을 수행하는 소비자의 기본 클래스가 제공됩니다.
-
인수로 전달된 애플리케이션, 데이터 스트림 및 리전 이름을 읽습니다.
-
리전 이름을 사용하여
KinesisAsyncClient
인스턴스를 생성합니다. -
StockTradeRecordProcessorFactory
인스턴스에 의해 구현된ShardRecordProcessor
의 서버 인스턴스를 제공하는StockTradeRecordProcessor
인스턴스를 생성합니다. -
KinesisAsyncClient
,StreamName
,ApplicationName
및StockTradeRecordProcessorFactory
인스턴스를 사용하여ConfigsBuilder
인스턴스를 생성합니다. 이 기능은 기본값을 사용하여 모든 구성을 생성하는 경우에 유용합니다. -
ConfigsBuilder
인스턴스를 사용하여 KCL 스케줄러(이전에 KCL 버전 1.x에서는 KCL 작업자라고 함)를 생성합니다. -
스케줄러는 각 샤드(이 소비자 인스턴스에 할당된 샤드)에 대해 새 스레드를 생성합니다. 이 스레드는 데이터 스트림에서 계속 반복적으로 레코드를 읽습니다. 그런 다음
StockTradeRecordProcessor
인스턴스를 호출하여 수신한 각 일괄 레코드를 처리합니다.
-
- StockTradeRecordProcessor수업
-
StockTradeRecordProcessor
인스턴스의 구현입니다. 이 클래스는 다시initialize
,processRecords
,leaseLost
,shardEnded
및shutdownRequested
라는 다섯 가지 필수 메서드를 구현합니다.initialize
및shutdownRequested
메서드는 KCL에서 레코드 수신을 시작할 준비가 될 때 및 레코드 수신을 중지해야 할 때 애플리케이션별 설정 및 종료 작업을 수행할 수 있도록 레코드 프로세서에 알리기 위해 사용됩니다.leaseLost
및shardEnded
는 리스가 손실되거나 처리가 샤드의 끝에 도달할 때 수행할 작업에 대한 로직을 구현하는 데 사용됩니다. 이 예에서는 이러한 이벤트를 나타내는 메시지만 기록합니다.이러한 메서드에 대한 코드가 제공됩니다.
processRecords
메서드에서 기본 처리가 발생하며, 각 레코드에 대해processRecord
를 사용합니다. 이 후자의 메서드는 다음 단계에서 구현할 수 있도록 대부분 비어 있는 스켈레톤 코드로 사용자에게 제공됩니다. 이 코드에 대해서는 다음 단계에서 더 자세히 설명합니다.또한
processRecord
:reportStats
및resetStats
에 대한 지원 메서드의 구현도 중요합니다. 이러한 메서드는 원래 소스 코드에서 비어 있습니다.processRecords
메서드가 구현되며 다음 단계를 수행합니다.-
전달된 각 레코드에 대해
processRecord
를 호출합니다. -
마지막 보고 이후 1분 이상이 경과된 경우 최신 통계를 인쇄하는
reportStats()
를 호출한 후 통계를 지우는resetStats()
를 호출하여 다음 간격에 새 레코드만 포함되도록 합니다. -
다음 보고 시간을 설정합니다.
-
마지막 체크포인트 이후 1분 이상이 경과된 경우
checkpoint()
를 호출합니다. -
다음 검사 시간을 설정합니다.
이 메서드는 보고 및 검사 속도에 대해 60초 간격을 사용합니다. 체크포인트 설정에 대한 자세한 내용은 단원을 참조하십시오.Kinesis 클라이언트 라이브러리 사용.
-
- StockStats수업
-
이 클래스는 시간에 따른 가장 인기 있는 주식에 대한 통계 추적 및 데이터 보존을 제공합니다. 다음 메서드가 포함된 이 코드가 제공됩니다.
-
addStockTrade(StockTrade)
: 지정된StockTrade
를 실행 중인 통계에 주입합니다. -
toString()
: 형식이 지정된 문자열로 통계를 반환합니다.
이 클래스는 각 주식에 대한 총 거래 수의 실행 개수와 최대 개수를 유지하여 가장 인기 있는 주식을 추적합니다. 그리고 주식 거래가 발생할 때마다 이러한 계수가 업데이트됩니다.
-
다음 단계에 표시된 대로 StockTradeRecordProcessor
클래스의 메서드에 코드를 추가합니다.
소비자를 구현하려면
-
정확한 크기의
processRecord
객체를 인스턴스화하고, 해당 객체에 레코드 데이터를 추가하고, 문제가 있는 경우 경고를 기록하여StockTrade
메서드를 구현합니다.byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
-
간단한
reportStats
메서드를 구현합니다. 기본 설정에 적합하게 출력 형식을 자유롭게 수정합니다.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
resetStats
메서드를 구현합니다. 이 메서드는 새stockStats
인스턴스를 생성합니다.stockStats = new StockStats();
-
ShardRecordProcessor
인터페이스에 필요한 다음과 같은 메서드를 구현합니다.@Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
소비자를 실행하려면
-
4단계: 생산자 구현에서 작성한 생산자를 실행하여 시뮬레이션된 주식 거래 레코드를 스트림에 첨가합니다.
-
앞에서 (IAM 사용자를 생성할 때) 검색한 액세스 키 및 보안 키 페어가 파일 에 저장되었는지 확인합니다.
~/.aws/credentials
. -
다음과 같은 인수를 사용하여
StockTradesProcessor
클래스를 실행합니다.StockTradesProcessor StockTradeStream us-west-2
us-west-2
이외의 리전에서 스트림을 생성한 경우 해당 리전을 여기에 대신 지정해야 합니다.
1분 후 다음과 같은 출력이 표시되어야 하며, 그 이후로 매분마다 새로 고침됩니다.
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************