5단계: 소비자 구현 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

5단계: 소비자 구현

자습서: KPL 및 KCL 1.x를 사용하여 실시간 주식 데이터 처리의 소비자 애플리케이션은 4단계: 생산자 구현에서 만든 주식 거래 스트림을 계속 처리합니다. 그런 다음 1분마다 매매된 가장 인기 있는 주식들을 출력합니다. 이 애플리케이션은 소비자 앱에 공통적인 과중한 업무를 많이 수행하는 Kinesis Client Library (KCL) 을 기반으로 하여 빌드됩니다. 자세한 정보는 KCL 1.x 소비자 개발을 참조하십시오.

소스 코드를 참조하여 다음 정보를 검토하십시오.

StockTrades프로세서 클래스

다음 작업을 수행하는 소비자의 기본 클래스가 제공됩니다.

  • 인수로 전달된 애플리케이션, 스트림 및 리전 이름을 읽습니다.

  • ~/.aws/credentials에서 자격 증명을 읽습니다.

  • RecordProcessorFactory 인스턴스에 의해 구현된 RecordProcessor의 서버 인스턴스를 제공하는 StockTradeRecordProcessor 인스턴스를 생성합니다.

  • 다음을 사용하여 KCL 워커를 만듭니다.RecordProcessorFactory인스턴스 및 스트림 이름, 자격 증명 및 애플리케이션 이름이 포함된 표준 구성입니다.

  • 작업자는 각 샤드 (이 소비자 인스턴스에 할당된 샤드) 에 대해 새 스레드를 생성하며, 이 스레드는 Kinesis Data Streams 에서 계속 반복적으로 레코드를 읽습니다. 그런 다음 RecordProcessor 인스턴스를 호출하여 수신한 각 일괄 레코드를 처리합니다.

StockTradeRecordProcessor수업

RecordProcessor 인스턴스를 구현하고 initialize, processRecordsshutdown의 세 가지 필수 메서드를 구현합니다.

이름에서 알 수 있듯이initializeshutdown은 Kinesis Client Library 에서 레코드 수신을 시작할 준비가 될 시점과 레코드 수신을 중지해야 할 시점을 각각 알리므로, 레코드 프로세서에 모든 애플리케이션별 설정 및 종료 작업을 수행할 수 있습니다. 이에 대한 코드가 제공됩니다. processRecords 메서드에서 기본 처리가 발생하며, 각 레코드에 대해 processRecord를 사용합니다. 이 후자의 메서드는 사용자에 대해 대부분의 빈 스켈레톤 코드로 제공되어 향후 설명할 다음 단계에서 구현됩니다.

원래 소스 코드에서 비어 있는 processRecord: reportStats, and resetStats에 대한 지원 메서드의 구현에 유의하십시오.

processRecords 메서드가 구현되며 다음 단계를 수행합니다.

  • 전달된 각 레코드의 경우 processRecord를 호출합니다.

  • 마지막 보고 이후 1분 이상이 경과된 경우 최신 통계를 인쇄하는 reportStats()를 호출한 후 통계를 지우는 resetStats()를 호출하여 다음 간격에 새 레코드만 포함되도록 합니다.

  • 다음 보고 시간을 설정합니다.

  • 마지막 체크포인트 이후 1분 이상이 경과된 경우 checkpoint()를 호출합니다.

  • 다음 검사 시간을 설정합니다.

이 메서드는 보고 및 검사 속도에 대해 60초 간격을 사용합니다. 검사에 대한 자세한 내용은 소비자에 대한 추가 정보 단원을 참조하십시오.

StockStats수업

이 클래스는 시간에 따른 가장 인기 있는 주식에 대한 통계 추적 및 데이터 보존을 제공합니다. 다음 메서드가 포함된 이 코드가 제공됩니다.

  • addStockTrade(StockTrade): 지정된 StockTrade를 실행 중인 통계에 삽입합니다.

  • toString(): 형식이 지정된 문자열로 통계를 반환합니다.

이 클래스는 각 주식에 대한 총 거래 수의 실행 개수와 최대 개수를 유지하여 가장 인기 있는 주식을 추적합니다. 그리고 주식 거래가 발생할 때마다 이러한 계수가 업데이트됩니다.

다음 단계에 표시된 대로 StockTradeRecordProcessor 클래스의 메서드에 코드를 추가합니다.

소비자를 구현하려면

  1. 정확한 크기의 processRecord 객체를 인스턴스화하고, 해당 객체에 레코드 데이터를 추가하고, 문제가 있는 경우 경고를 기록하여 StockTrade 메서드를 구현합니다.

    StockTrade trade = StockTrade.fromJsonAsBytes(record.getData().array()); if (trade == null) { LOG.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.getPartitionKey()); return; } stockStats.addStockTrade(trade);
  2. 간단한 reportStats 메서드를 구현합니다. 기본 설정에 대한 출력 형식을 자유롭게 수정합니다.

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 마지막으로 resetStats 메서드를 구현합니다. 그러면 새 stockStats 인스턴스가 생성됩니다.

    stockStats = new StockStats();

소비자를 실행하려면

  1. 4단계: 생산자 구현에서 작성한 생산자를 실행하여 시뮬레이션된 주식 거래 레코드를 스트림에 첨가합니다.

  2. 앞에서 (IAM 사용자를 생성할 때) 검색한 액세스 키 및 보안 키 페어가 파일 에 저장되었는지 확인합니다.~/.aws/credentials.

  3. 다음과 같은 인수를 사용하여 StockTradesProcessor 클래스를 실행합니다.

    StockTradesProcessor StockTradeStream us-west-2

    us-west-2 이외의 리전에 스트림을 생성한 경우 여기에 해당 리전을 대신 지정해야 합니다.

1분 후 다음과 같은 출력이 표시되어야 하며, 그 이후로 매분마다 새로 고침됩니다.

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

소비자에 대한 추가 정보

Kinesis Client Library 의 이점을 잘 알고 있는 경우 에서 설명합니다.KCL 1.x 소비자 개발그리고 다른 곳에서는 여기에서 KIN을 사용해야 하는 이유에 대해 궁금할 수 있습니다. 단일 샤드 스트림과 단일 소비자 인스턴스만 사용하여 처리하는 경우에도 KCL을 사용하여 소비자를 구현하는 것이 훨씬 더 쉽습니다. 생산자 단원의 코드 구현 단계를 소비자와 비교하면 소비자 구현이 비교적 쉽다는 것을 알 수 있습니다. 이는 KCL이 제공하는 서비스로 인한 것입니다.

이 애플리케이션에서는 개별 레코드를 처리할 수 있는 레코드 프로세서 클래스를 구현하는 것에 중점을 둡니다. Kinesis Data Streams 에서 레코드를 가져오는 방식에 대해서는 걱정할 필요가 없습니다. KCL은 새 레코드가 사용 가능할 때마다 레코드를 가져오고 레코드 프로세서를 호출합니다. 또한 얼마나 많은 샤드와 소비자 인스턴스가 있는지에 대해서도 걱정할 필요가 없습니다. 스트림이 확장되면 둘 이상의 샤드 또는 소비자 인스턴스를 처리하기 위해 애플리케이션을 다시 작성할 필요가 없습니다.

검사라는 용어는 지금까지 사용 및 처리된 데이터 레코드까지 스트림의 지점을 기록하는 것을 의미하므로, 애플리케이션이 충돌할 경우 스트림의 시작이 아닌 해당 지점에서 스트림을 읽습니다. 검사 주체와 다양한 디자인 패턴 및 이에 대한 모범 사례는 이 장의 범위를 벗어나지만, 프로덕션 환경에서 직면할 수 있는 사항입니다.

에서 배운 대로4단계: 생산자 구현,putKinesis Data Streams API의 작업은파티션 키입력으로 Kinesis Data Streams 는 여러 샤드 간에 레코드를 분산하는 메커니즘으로 파티션 키를 사용합니다 (스트림에 두 개 이상의 샤드가 있는 경우). 동일한 파티션 키는 항상 동일한 샤드에 라우팅됩니다. 이를 통해 특정 샤드를 처리하는 소비자는 동일한 파티션 키가 있는 레코드는 해당 소비자에게만 전송되며, 다른 소비자에 전송될 수 없다는 가정에 기반하여 설계할 수 있습니다. 따라서 소비자의 작업자는 필요한 데이터가 누락될 수 있다는 걱정 없이 동일한 파티션 키가 있는 모든 레코드를 집계할 수 있습니다.

이 애플리케이션에서 소비자의 레코드 처리는 집약적이지 않으므로 샤드 하나를 사용하고 KCL 스레드와 동일한 스레드에서 처리할 수 있습니다. 하지만 실제로 먼저 샤드 수를 확장하는 것을 고려해 보십시오. 일부 경우에는 처리를 다른 스레드로 전환하거나, 레코드 처리가 집약적으로 예상될 경우 스레드 풀을 사용할 수 있습니다. 이러한 방식으로 KCL은 다른 스레드가 레코드를 병렬로 처리하는 동안 새 레코드를 더욱 신속하게 가져올 수 있습니다. 멀티스레드 디자인은 중요한 요소이며 고급 기술을 사용하여 접근해야 합니다. 따라서 샤드 수를 늘리는 것은 일반적으로 확장하기 위한 가장 효과적이고 쉬운 방식입니다.

다음 단계

6단계: (선택 사항) 소비자 확장