프로듀서 구현 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

프로듀서 구현

이 자습서에서는 주식 시장 거래 모니터링의 실제 시나리오를 사용합니다. 다음 원칙은 이 시나리오가 생산자와 생산자의 지원 코드 구조에 매핑되는 방법을 간략하게 설명합니다.

소스 코드를 참조하여 다음 정보를 검토하십시오.

StockTrade 클래스

개별 주식 거래는 해당 StockTrade 클래스의 인스턴스로 표시됩니다. 이 인스턴스에는 티커 기호, 가격, 공유 수, 거래 유형(구매 또는 판매), 거래를 고유하게 식별하는 ID 등의 속성이 포함됩니다. 이 클래스가 사용자를 위해 구현됩니다.

스트림 레코드

스트림은 레코드의 시퀀스입니다. 레코드란 StockTrade 인스턴스를 JSON 포맷으로 직렬화한 것입니다. 예:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator 클래스

StockTradeGenerator 호출될 때마다 무작위로 생성된 새로운 주식 거래를 getRandomTrade() 반환하는 메서드가 있습니다. 이 클래스가 사용자를 위해 구현됩니다.

StockTradesWriter 클래스

생산자의 main 메서드는 다음 작업을 수행하여 StockTradesWriter 지속적으로 무작위 거래를 검색한 다음 Kinesis Data Streams로 전송합니다.

  1. 데이터 스트림 이름과 지역 이름을 입력으로 읽습니다.

  2. KinesisAsyncClientBuilder를 사용하여 지역, 자격 증명 및 클라이언트 구성을 설정합니다.

  3. 스트림의 존재 여부와 활성 상태 여부를 확인합니다. 그렇지 않은 경우 오류로 종료됩니다.

  4. 연속 루프에서 StockTradeGenerator.getRandomTrade() 메서드를 호출하고 sendStockTrade 메서드를 호출하여 100밀리초마다 거래를 스트림으로 전송합니다.

sendStockTrade 클래스의 StockTradesWriter 메서드에는 다음 코드가 있습니다.

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

다음 코드 세부 분석을 참조하십시오.

  • 에는 PutRecord API 바이트 배열이 필요하며 트레이드를 JSON 형식으로 변환해야 합니다. 이 한 줄의 코드는 다음 작업을 수행합니다.

    byte[] bytes = trade.toJsonAsBytes();
  • 거래를 전송하기 전에 새 PutRecordRequest 인스턴스(이 경우 요청이라고 함)를 생성합니다. 각 request에는 스트림 이름, 파티션 키 및 데이터 BLOB가 필요합니다.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    이 예제에서는 주식 시세를 파티션 키로 사용하여 레코드를 특정 샤드에 매핑합니다. 실제로 레코드가 스트림에 대해 균등하게 분산되도록 샤드당 수백 개 또는 수천 개의 파티션 키가 있어야 합니다. 스트림에 데이터를 추가하는 방법에 대한 자세한 내용은 Amazon Kinesis Data Streams에 데이터 쓰기 단원을 참조하십시오.

    이제 request를 클라이언트에 전송할 준비가 되었습니다(put 작업).

    kinesisClient.putRecord(request).get();
  • 오류 확인과 로깅 기능은 항상 유용한 추가 기능입니다. 이 코드는 오류 조건을 기록합니다.

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    put넣기 작업에 try/catch 블록을 추가합니다.

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    이렇게 하는 이유는 네트워크 오류로 인해 또는 처리량 제한에 도달하여 병목 현상이 발생한 데이터 스트림으로 인해 Kinesis Data Streams put 작업이 실패할 수 있기 때문입니다. 데이터 손실을 방지하기 위한 put 작업 (예: 재시도 사용) 에 대한 재시도 정책을 신중하게 고려하는 것이 좋습니다.

  • 상태 로깅은 유용하지만 선택 사항입니다.

    LOG.info("Putting trade: " + trade.toString());

여기에 표시된 프로듀서는 Kinesis Data API Streams 단일 레코드 PutRecord 기능인 을 사용합니다. 실제로 개별 생산자가 많은 레코드를 생성하는 경우 PutRecords의 여러 레코드 기능을 사용하고 레코드의 배치를 한 번에 전송하는 것이 더 효율적인 경우가 많습니다. 자세한 내용은 Amazon Kinesis Data Streams에 데이터 쓰기 단원을 참조하십시오.

생산자를 실행하려면
  1. 정책 및 사용자 만들기 IAM에서 검색한 액세스 키 및 보안 키 페어가 파일 ~/.aws/credentials에 저장되었는지 확인합니다.

  2. 다음과 같은 인수를 사용하여 StockTradeWriter 클래스를 실행합니다.

    StockTradeStream us-west-2

    us-west-2 이외의 리전에서 스트림을 생성한 경우 해당 리전을 여기에 대신 지정해야 합니다.

다음과 유사한 출력 화면이 표시되어야 합니다.

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

이제 주식 거래가 Kinesis Data Streams에서 수집됩니다.

다음 단계

소비자 구현