4단계: 생산자 구현 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

4단계: 생산자 구현

자습서: KPL 및 KCL 1.x를 사용하여 실시간 주식 데이터 처리의 애플리케이션은 실제 주식 시장 거래 모니터링의 시나리오를 사용합니다. 다음 원칙은 이 시나리오를 생산자와 지원 코드 구조에 매핑하는 방법을 간략하게 설명합니다.

소스 코드를 참조하여 다음 정보를 검토하십시오.

StockTrade수업

개별 주식 거래는 StockTrade 클래스의 인스턴스로 표시됩니다. 이 인스턴스에는 티커 기호, 가격, 공유 수, 거래 유형(구매 또는 판매), 거래를 고유하게 식별하는 ID 등의 속성이 포함됩니다. 이 클래스가 사용자를 위해 구현됩니다.

스트림 레코드

스트림은 레코드의 시퀀스입니다. 레코드는 JSON 형식으로 된 StockTrade 인스턴스의 직렬화입니다. 예:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTrade발전기 클래스

StockTradeGenerator에는 호출될 때마다 새로 생성된 임의의 주식 거래를 반환하는 getRandomTrade()라는 메서드가 있습니다. 이 클래스가 사용자를 위해 구현됩니다.

StockTrades라이터 클래스

main생산자의 방법,StockTradesWriter계속적으로 임의의 거래를 검색하고 다음 작업을 수행하여 Kinesis Data Streams 에 전송합니다.

  1. 스트림 이름과 리전 이름을 입력으로 읽습니다.

  2. AmazonKinesisClientBuilder를 생성합니다.

  3. 클라이언트 빌더를 사용하여 리전, 자격 증명 및 클라이언트 구성을 설정합니다.

  4. 클라이언트 빌더를 사용하여 AmazonKinesis 클라이언트를 빌드합니다.

  5. 스트림의 존재 여부와 활성 상태 여부를 확인합니다. 그렇지 않은 경우 오류로 종료됩니다.

  6. 연속 루프에서 StockTradeGenerator.getRandomTrade() 메서드를 호출하고 sendStockTrade 메서드를 호출하여 100밀리초마다 거래를 스트림으로 전송합니다.

sendStockTrade 클래스의 StockTradesWriter 메서드에는 다음 코드가 있습니다.

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

다음 코드 세부 분석을 참조하십시오.

  • PutRecord API에는 바이트 어레이가 필요하며, trade를 JSON 형식으로 변환해야 합니다. 이 한 줄의 코드는 다음 작업을 수행합니다.

    byte[] bytes = trade.toJsonAsBytes();
  • 거래를 전송하기 전에 새 PutRecordRequest 인스턴스(이 경우 putRecord라고 함)를 생성합니다.

    PutRecordRequest putRecord = new PutRecordRequest();

    PutRecord 호출에는 스트림 이름, 파티션 키 및 데이터 BLOB가 필요합니다. 다음 코드는 putRecord 메서드를 사용하여 setXxxx() 객체의 이러한 필드를 채웁니다.

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    이 예제는 특정 샤드에 레코드를 매핑하는 주식 티켓을 파티션 키로 사용합니다. 실제로 레코드가 스트림에 대해 균등하게 분산되도록 샤드당 수백 개 또는 수천 개의 파티션 키가 있어야 합니다. 스트림에 데이터를 추가하는 방법에 대한 자세한 내용은 스트림에 데이터 추가 단원을 참조하십시오.

    이제 putRecord를 클라이언트에 전송할 준비가 되었습니다(put 작업).

    kinesisClient.putRecord(putRecord);
  • 오류 확인과 로깅 기능은 항상 유용한 추가 기능입니다. 이 코드는 오류 조건을 기록합니다.

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    put넣기 작업에 try/catch 블록을 추가합니다.

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    Kinesis Data Streamsput네트워크 오류로 인해 또는 스트림이 처리량 제한에 도달하고 병목 현상이 발생함으로 인해 작업이 실패할 수 있습니다. 데이터 손실을 방지하기 위해 단순 재시도를 사용하는 것과 같이 put넣기 작업에 대한 재시도 정책을 신중히 고려하는 것이 좋습니다.

  • 상태 로깅은 유용하지만 선택 사항입니다.

    LOG.info("Putting trade: " + trade.toString());

여기에 표시된 생산자는 Kinesis Data Streams API 단일 레코드 기능을 사용합니다.PutRecord. 실제로 개별 생산자가 많은 레코드를 생성하는 경우 PutRecords의 여러 레코드 기능을 사용하고 레코드의 배치를 한 번에 전송하는 것이 더 효율적인 경우가 많습니다. 자세한 정보는 스트림에 데이터 추가을 참조하십시오.

생산자를 실행하려면

  1. 앞에서 (IAM 사용자를 생성할 때) 검색한 액세스 키 및 보안 키 페어가 파일 에 저장되었는지 확인합니다.~/.aws/credentials.

  2. 다음과 같은 인수를 사용하여 StockTradeWriter 클래스를 실행합니다.

    StockTradeStream us-west-2

    us-west-2 이외의 리전에 스트림을 생성한 경우 여기에 해당 리전을 대신 지정해야 합니다.

다음과 유사한 출력 화면이 표시되어야 합니다.

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Kinesis Data Streams 에서 주식 거래 스트림이 수집됩니다.

다음 단계

5단계: 소비자 구현