기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
5단계: 소비자 구현
자습서: KPL 및 KCL 1.x를 사용하여 실시간 주식 데이터 처리의 소비자 애플리케이션은 4단계: 생산자 구현에서 만든 주식 거래 스트림을 계속 처리합니다. 그런 다음 1분마다 매매된 가장 인기 있는 주식들을 출력합니다. 이 애플리케이션은 소비자 앱에 공통적인 과중한 업무를 많이 수행하는 Kinesis Client Library (KCL) 을 기반으로 하여 빌드됩니다. 자세한 정보는 KCL 1.x 소비자 개발을 참조하십시오.
소스 코드를 참조하여 다음 정보를 검토하십시오.
- StockTrades프로세서 클래스
-
다음 작업을 수행하는 소비자의 기본 클래스가 제공됩니다.
-
인수로 전달된 애플리케이션, 스트림 및 리전 이름을 읽습니다.
-
~/.aws/credentials
에서 자격 증명을 읽습니다. -
RecordProcessorFactory
인스턴스에 의해 구현된RecordProcessor
의 서버 인스턴스를 제공하는StockTradeRecordProcessor
인스턴스를 생성합니다. -
다음을 사용하여 KCL 워커를 만듭니다.
RecordProcessorFactory
인스턴스 및 스트림 이름, 자격 증명 및 애플리케이션 이름이 포함된 표준 구성입니다. -
작업자는 각 샤드 (이 소비자 인스턴스에 할당된 샤드) 에 대해 새 스레드를 생성하며, 이 스레드는 Kinesis Data Streams 에서 계속 반복적으로 레코드를 읽습니다. 그런 다음
RecordProcessor
인스턴스를 호출하여 수신한 각 일괄 레코드를 처리합니다.
-
- StockTradeRecordProcessor수업
-
RecordProcessor
인스턴스를 구현하고initialize
,processRecords
및shutdown
의 세 가지 필수 메서드를 구현합니다.이름에서 알 수 있듯이
initialize
과shutdown
은 Kinesis Client Library 에서 레코드 수신을 시작할 준비가 될 시점과 레코드 수신을 중지해야 할 시점을 각각 알리므로, 레코드 프로세서에 모든 애플리케이션별 설정 및 종료 작업을 수행할 수 있습니다. 이에 대한 코드가 제공됩니다.processRecords
메서드에서 기본 처리가 발생하며, 각 레코드에 대해processRecord
를 사용합니다. 이 후자의 메서드는 사용자에 대해 대부분의 빈 스켈레톤 코드로 제공되어 향후 설명할 다음 단계에서 구현됩니다.원래 소스 코드에서 비어 있는
processRecord
:reportStats
, andresetStats
에 대한 지원 메서드의 구현에 유의하십시오.processRecords
메서드가 구현되며 다음 단계를 수행합니다.-
전달된 각 레코드의 경우
processRecord
를 호출합니다. -
마지막 보고 이후 1분 이상이 경과된 경우 최신 통계를 인쇄하는
reportStats()
를 호출한 후 통계를 지우는resetStats()
를 호출하여 다음 간격에 새 레코드만 포함되도록 합니다. -
다음 보고 시간을 설정합니다.
-
마지막 체크포인트 이후 1분 이상이 경과된 경우
checkpoint()
를 호출합니다. -
다음 검사 시간을 설정합니다.
이 메서드는 보고 및 검사 속도에 대해 60초 간격을 사용합니다. 검사에 대한 자세한 내용은 소비자에 대한 추가 정보 단원을 참조하십시오.
-
- StockStats수업
-
이 클래스는 시간에 따른 가장 인기 있는 주식에 대한 통계 추적 및 데이터 보존을 제공합니다. 다음 메서드가 포함된 이 코드가 제공됩니다.
-
addStockTrade(StockTrade)
: 지정된StockTrade
를 실행 중인 통계에 삽입합니다. -
toString()
: 형식이 지정된 문자열로 통계를 반환합니다.
이 클래스는 각 주식에 대한 총 거래 수의 실행 개수와 최대 개수를 유지하여 가장 인기 있는 주식을 추적합니다. 그리고 주식 거래가 발생할 때마다 이러한 계수가 업데이트됩니다.
-
다음 단계에 표시된 대로 StockTradeRecordProcessor
클래스의 메서드에 코드를 추가합니다.
소비자를 구현하려면
-
정확한 크기의
processRecord
객체를 인스턴스화하고, 해당 객체에 레코드 데이터를 추가하고, 문제가 있는 경우 경고를 기록하여StockTrade
메서드를 구현합니다.StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
-
간단한
reportStats
메서드를 구현합니다. 기본 설정에 대한 출력 형식을 자유롭게 수정합니다.System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
-
마지막으로
resetStats
메서드를 구현합니다. 그러면 새stockStats
인스턴스가 생성됩니다.stockStats = new StockStats();
소비자를 실행하려면
-
4단계: 생산자 구현에서 작성한 생산자를 실행하여 시뮬레이션된 주식 거래 레코드를 스트림에 첨가합니다.
-
앞에서 (IAM 사용자를 생성할 때) 검색한 액세스 키 및 보안 키 페어가 파일 에 저장되었는지 확인합니다.
~/.aws/credentials
. -
다음과 같은 인수를 사용하여
StockTradesProcessor
클래스를 실행합니다.StockTradesProcessor StockTradeStream us-west-2
us-west-2
이외의 리전에 스트림을 생성한 경우 여기에 해당 리전을 대신 지정해야 합니다.
1분 후 다음과 같은 출력이 표시되어야 하며, 그 이후로 매분마다 새로 고침됩니다.
****** Shard shardId-000000000001 stats for last 1 minute ******
Most popular stock being bought: WMT, 27 buys.
Most popular stock being sold: PTR, 14 sells.
****************************************************************
소비자에 대한 추가 정보
Kinesis Client Library 의 이점을 잘 알고 있는 경우 에서 설명합니다.KCL 1.x 소비자 개발그리고 다른 곳에서는 여기에서 KIN을 사용해야 하는 이유에 대해 궁금할 수 있습니다. 단일 샤드 스트림과 단일 소비자 인스턴스만 사용하여 처리하는 경우에도 KCL을 사용하여 소비자를 구현하는 것이 훨씬 더 쉽습니다. 생산자 단원의 코드 구현 단계를 소비자와 비교하면 소비자 구현이 비교적 쉽다는 것을 알 수 있습니다. 이는 KCL이 제공하는 서비스로 인한 것입니다.
이 애플리케이션에서는 개별 레코드를 처리할 수 있는 레코드 프로세서 클래스를 구현하는 것에 중점을 둡니다. Kinesis Data Streams 에서 레코드를 가져오는 방식에 대해서는 걱정할 필요가 없습니다. KCL은 새 레코드가 사용 가능할 때마다 레코드를 가져오고 레코드 프로세서를 호출합니다. 또한 얼마나 많은 샤드와 소비자 인스턴스가 있는지에 대해서도 걱정할 필요가 없습니다. 스트림이 확장되면 둘 이상의 샤드 또는 소비자 인스턴스를 처리하기 위해 애플리케이션을 다시 작성할 필요가 없습니다.
검사라는 용어는 지금까지 사용 및 처리된 데이터 레코드까지 스트림의 지점을 기록하는 것을 의미하므로, 애플리케이션이 충돌할 경우 스트림의 시작이 아닌 해당 지점에서 스트림을 읽습니다. 검사 주체와 다양한 디자인 패턴 및 이에 대한 모범 사례는 이 장의 범위를 벗어나지만, 프로덕션 환경에서 직면할 수 있는 사항입니다.
에서 배운 대로4단계: 생산자 구현,put
Kinesis Data Streams API의 작업은파티션 키입력으로 Kinesis Data Streams 는 여러 샤드 간에 레코드를 분산하는 메커니즘으로 파티션 키를 사용합니다 (스트림에 두 개 이상의 샤드가 있는 경우). 동일한 파티션 키는 항상 동일한 샤드에 라우팅됩니다. 이를 통해 특정 샤드를 처리하는 소비자는 동일한 파티션 키가 있는 레코드는 해당 소비자에게만 전송되며, 다른 소비자에 전송될 수 없다는 가정에 기반하여 설계할 수 있습니다. 따라서 소비자의 작업자는 필요한 데이터가 누락될 수 있다는 걱정 없이 동일한 파티션 키가 있는 모든 레코드를 집계할 수 있습니다.
이 애플리케이션에서 소비자의 레코드 처리는 집약적이지 않으므로 샤드 하나를 사용하고 KCL 스레드와 동일한 스레드에서 처리할 수 있습니다. 하지만 실제로 먼저 샤드 수를 확장하는 것을 고려해 보십시오. 일부 경우에는 처리를 다른 스레드로 전환하거나, 레코드 처리가 집약적으로 예상될 경우 스레드 풀을 사용할 수 있습니다. 이러한 방식으로 KCL은 다른 스레드가 레코드를 병렬로 처리하는 동안 새 레코드를 더욱 신속하게 가져올 수 있습니다. 멀티스레드 디자인은 중요한 요소이며 고급 기술을 사용하여 접근해야 합니다. 따라서 샤드 수를 늘리는 것은 일반적으로 확장하기 위한 가장 효과적이고 쉬운 방식입니다.