プロデューサーの実装 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

プロデューサーの実装

このチュートリアルでは、株式市場取引をモニタリングする実際のシナリオを使用しています。以下の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングできます。

ソースコードを参照し、以下の情報を確認します。

StockTrade クラス

個々の株式取引は、クラスのインスタンスによって表されます StockTrade。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。

ストリームレコード

ストリームとは、一連のレコードのことです。レコードは、 JSON形式のStockTradeインスタンスのシリアル化です。例:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator クラス

StockTradeGenerator には、ランダムに生成された新しい株式取引getRandomTrade()が呼び出されるたびにそれを返す というメソッドがあります。このクラスは、既に実装されています。

StockTradesWriter クラス

プロデューサーの mainメソッドは、 StockTradesWriterランダム取引を継続的に取得し、次のタスクを実行して Kinesis Data Streams に送信します。

  1. データストリーム名とリージョン名を入力として読み取ります。

  2. を使用してKinesisAsyncClientBuilder、リージョン、認証情報、およびクライアント設定を設定します。

  3. ストリームが存在し、アクティブであることを確認します (そうでない場合は、エラーで終了します)。

  4. 連続ループで、StockTradeGenerator.getRandomTrade() メソッドに続き sendStockTrade メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。

sendStockTrade クラスの StockTradesWriter メソッドには次のコードがあります。

private static void sendStockTrade(StockTrade trade, KinesisAsyncClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); } }

次のコードの詳細を参照してください。

  • はバイト配列PutRecordAPIを想定しており、取引をJSON形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。

    byte[] bytes = trade.toJsonAsBytes();
  • 取引を送信する前に、新しい PutRecordRequest インスタンス (この場合は request) を作成する必要があります。各 request には、ストリーム名、パーティションキー、データ BLOB が必要です。

    PutPutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build();

    この例では、レコードを特定のシャードにマッピングするパーティションキーとして株式ティッカーを使用しています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、Amazon Kinesis Data Streams へのデータの書き込みを参照してください。

    次に、request をクライアントに送信できます (put オペレーション)。

    kinesisClient.putRecord(request).get();
  • エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    put オペレーションの前後に try/catch ブロックを追加します。

    try { kinesisClient.putRecord(request).get(); } catch (InterruptedException e) { LOG.info("Interrupted, assuming shutdown."); } catch (ExecutionException e) { LOG.error("Exception while sending data to Kinesis. Will try again next cycle.", e); }

    これは、ネットワークエラーや、ストリームがスループット制限を超えて抑制されたことが原因で、Kinesis Data Streams の put オペレーションが失敗することがあるためです。再試行を使用するなど、データ損失を避けるため、putオペレーションの再試行ポリシーを慎重に検討することをお勧めします。

  • ステータスのログ記録は有益ですが、オプションです。

    LOG.info("Putting trade: " + trade.toString());

ここに示すプロデューサーは、Kinesis Data Streams のAPI単一レコード機能 を使用しますPutRecord。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、PutRecords のマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、Amazon Kinesis Data Streams へのデータの書き込みを参照してください。

プロデューサーを実行するには
  1. IAM ポリシーとユーザーを作成する で取得したアクセスキーとシークレットキーのペアがファイル ~/.aws/credentials に保存されていることを確認します。

  2. 次の引数を指定して StockTradeWriter クラスを実行します。

    StockTradeStream us-west-2

    us-west-2 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

次のような出力が表示されます:

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Kinesis Data Streams によって株式取引が取り込まれます。

次のステップ

コンシューマーを実装する