단계 4: 생산자 구현 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

단계 4: 생산자 구현

이 자습서에서는 주식 시장 거래 모니터링의 실제 시나리오를 사용합니다. 다음 원칙은 이 시나리오가 생산자와 생산자의 지원 코드 구조에 매핑되는 방법을 간략하게 설명합니다.

소스 코드를 참조하여 다음 정보를 검토하십시오.

StockTrade 클래스

개별 주식 거래는 StockTrade 클래스의 인스턴스로 표시됩니다. 이 인스턴스에는 티커 기호, 가격, 공유 수, 거래 유형(구매 또는 판매), 거래를 고유하게 식별하는 ID 등의 속성이 포함됩니다. 이 클래스가 사용자를 위해 구현됩니다.

스트림 레코드

스트림은 레코드의 시퀀스입니다. 레코드는 JSON 형식으로 된 StockTrade 인스턴스의 직렬화입니다. 예:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator 클래스

StockTradeGenerator에는 호출될 때마다 임의로 생성된 새 주식 거래를 반환하는 getRandomTrade()라는 메서드가 있습니다. 이 클래스가 사용자를 위해 구현됩니다.

StockTradesWriter 클래스

생산자의 main 메서드인 StockTradesWriter는 계속적으로 임의의 거래를 검색하고 다음 작업을 수행하여 Kinesis Data Streams에 전송합니다.

  1. 데이터 스트림 이름과 리전 이름을 입력으로 읽습니다.

  2. KinesisAsyncClientBuilder를 사용하여 리전, 자격 증명 및 클라이언트 구성을 설정합니다.

  3. 스트림의 존재 여부와 활성 상태 여부를 확인합니다. 그렇지 않은 경우 오류로 종료됩니다.

  4. 연속 루프에서 StockTradeGenerator.getRandomTrade() 메서드를 호출하고 sendStockTrade 메서드를 호출하여 100밀리초마다 거래를 스트림으로 전송합니다.

sendStockTrade 클래스의 StockTradesWriter 메서드에는 다음 코드가 있습니다.

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

다음 코드 세부 분석을 참조하십시오.

  • PutRecord API에는 바이트 배열이 필요하며, 거래를 JSON 형식으로 변환해야 합니다. 이 한 줄의 코드는 다음 작업을 수행합니다.

    byte[] bytes = trade.toJsonAsBytes();
  • 거래를 전송하기 전에 새 PutRecordRequest 인스턴스(이 경우 요청이라고 함)를 생성합니다. 각 request에는 스트림 이름, 파티션 키 및 데이터 BLOB가 필요합니다.

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    이 예제는 특정 샤드에 레코드를 매핑하는 주식 티켓을 파티션 키로 사용합니다. 실제로 레코드가 스트림에 대해 균등하게 분산되도록 샤드당 수백 개 또는 수천 개의 파티션 키가 있어야 합니다. 스트림에 데이터를 추가하는 방법에 대한 자세한 내용은 Amazon Kinesis Data Streams에 데이터 쓰기 단원을 참조하십시오.

    이제 request를 클라이언트에 전송할 준비가 되었습니다(put 작업).

    kinesisClient.putRecord(request).get();
  • 오류 확인과 로깅 기능은 항상 유용한 추가 기능입니다. 이 코드는 오류 조건을 기록합니다.

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    put넣기 작업에 try/catch 블록을 추가합니다.

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    이렇게 하는 이유는 네트워크 오류로 인해 또는 처리량 제한에 도달하여 병목 현상이 발생한 데이터 스트림으로 인해 Kinesis Data Streams put 작업이 실패할 수 있기 때문입니다. 데이터 손실을 방지하기 위해 단순 재시도 사용과 같은 put 작업에 대한 재시도 정책을 신중히 고려하는 것이 좋습니다.

  • 상태 로깅은 유용하지만 선택 사항입니다.

    LOG.info("Putting trade: " + trade.toString());

여기에 표시된 생산자는 Kinesis Data Streams API 단일 레코드 기능인 PutRecord를 사용합니다. 실제로 개별 생산자가 많은 레코드를 생성하는 경우 PutRecords의 여러 레코드 기능을 사용하고 레코드의 배치를 한 번에 전송하는 것이 더 효율적인 경우가 많습니다. 자세한 내용은 Amazon Kinesis Data Streams에 데이터 쓰기 섹션을 참조하세요.

생산자를 실행하려면
  1. 2단계: IAM 정책 및 사용자 생성에서 검색한 액세스 키 및 보안 키 페어가 파일 ~/.aws/credentials에 저장되었는지 확인합니다.

  2. 다음과 같은 인수를 사용하여 StockTradeWriter 클래스를 실행합니다.

    StockTradeStream us-west-2

    us-west-2 이외의 리전에서 스트림을 생성한 경우 해당 리전을 여기에 대신 지정해야 합니다.

다음과 유사한 출력 화면이 표시되어야 합니다.

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

이제 주식 거래가 Kinesis Data Streams에서 수집됩니다.

다음 단계

단계 5: 소비자 구현