Amazon Kinesis Data Streams
開発者ガイド

ステップ 4: プロデューサーを実装する

チュートリアル: Kinesis Data Streams を使用した株式データのリアルタイム分析 のアプリケーションでは、株式市場取引をモニタリングする実際のシナリオが使用されます。次の原理によって、このシナリオをプロデューサーおよびサポートコード構造にマッピングすることができます。

ソースコードを参照し、次の情報を確認してください。

StockTrade クラス

株式取引は、StockTrade クラスのインスタンスによって個別に表されます。このインスタンスには、ティッカーシンボル、株価、株数、取引のタイプ (買いまたは売り)、取引を一意に識別する ID などの属性が含まれます。このクラスは、既に実装されています。

ストリームレコード

ストリームとは、一連のレコードのことです。レコードとは、JSON 形式による連続する StockTrade インスタンスの 1 つを表しています。(例:

{ "tickerSymbol": "AMZN", "tradeType": "BUY", "price": 395.87, "quantity": 16, "id": 3567129045 }
StockTradeGenerator クラス

StockTradeGenerator には、呼び出されるたびにランダムに生成された新しい株式取引を返す、getRandomTrade() と呼ばれるメソッドが含まれています。このクラスは、既に実装されています。

StockTradesWriter クラス

プロデューサーの main メソッドである StockTradesWriter は、継続的にランダム取引を取得し、次のタスクを実行してそれらを Kinesis Data Streams に送信します。

  1. ストリーム名とリージョン名を入力として読み取ります。

  2. AmazonKinesisClientBuilder を作成します。

  3. クライアントビルダーを使用してリージョン、認証情報、およびクライアント構成を設定します。

  4. クライアントビルダーを使用して AmazonKinesis クライアントを構成します。

  5. ストリームが存在し、アクティブであることを確認します(そうでない場合は、エラーで終了します)。

  6. 連続ループで、StockTradeGenerator.getRandomTrade() メソッドに続き sendStockTrade メソッドを呼び出して、100 ミリ秒ごとに取引をストリームに送信します。

StockTradesWriter クラスの sendStockTrade メソッドには次のコードがあります。

private static void sendStockTrade(StockTrade trade, AmazonKinesis kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by the Jackson JSON library. if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; } LOG.info("Putting trade: " + trade.toString()); PutRecordRequest putRecord = new PutRecordRequest(); putRecord.setStreamName(streamName); // We use the ticker symbol as the partition key, explained in the Supplemental Information section below. putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes)); try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); } }

次のコードの詳細を参照してください。

  • PutRecord API はバイト配列を想定するため、trade を JSON 形式に変換する必要があります。この操作は、次の 1 行のコードによって行われます。

    byte[] bytes = trade.toJsonAsBytes();
  • 取引を送信する前に、新しい PutRecordRequest インスタンス(この場合、putRecord と呼ばれる)を作成する必要があります。

    PutRecordRequest putRecord = new PutRecordRequest();

    PutRecord の呼び出しには、ストリーム名、パーティションキー、およびデータ BLOB が必要です。次のコードによって、setXxxx() メソッドを使用して、これらのフィールドを putRecord オブジェクトに追加します。

    putRecord.setStreamName(streamName); putRecord.setPartitionKey(trade.getTickerSymbol()); putRecord.setData(ByteBuffer.wrap(bytes));

    この例では、株式チケットをパーティションキーとして使用することで、レコードを特定のシャードにマッピングしています。実際には、レコードがストリーム全体に均等に分散するように、シャード 1 つあたりに数百個または数千個のパーティションキーを用意する必要があります。ストリームにデータを追加する方法の詳細については、「ストリームへのデータの追加」を参照してください。

    次に、putRecord をクライアントに送信(put オペレーション)することができます。

    kinesisClient.putRecord(putRecord);
  • エラーチェックとログ記録は、いつでも追加して損はありません。次のコードによって、エラー状態を記録します。

    if (bytes == null) { LOG.warn("Could not get JSON bytes for stock trade"); return; }

    put オペレーションの前後に try/catch ブロックを追加します。

    try { kinesisClient.putRecord(putRecord); } catch (AmazonClientException ex) { LOG.warn("Error sending record to Amazon Kinesis.", ex); }

    これは、ネットワークエラーや、ストリームがスループット限界を超えて抑制されたために Kinesis Data Streams put オペレーションが失敗することがあるためです。データが失われることがないように、単純な再試行として使用するなど、put オペレーションの再試行ポリシーを慎重に検討することをお勧めします。

  • ステータスのログ記録()は有益ですが、オプションです。

    LOG.info("Putting trade: " + trade.toString());

ここに示されているプロデューサーでは、Kinesis Data Streams API のシングルレコード機能(PutRecord)が使用されています。実際には、個々のプロデューサーで大量のレコードが生成される場合があります。その場合、PutRecords のマルチレコード機能を使用して、レコードのバッチを一度に送信する方が効率的です。詳細については、「ストリームへのデータの追加」を参照してください。

プロデューサーを実行するには

  1. 前のステップ(IAM ユーザーを作成したとき)で取得したアクセスキーとシークレットキーのペアがファイル ~/.aws/credentials に保存されていることを確認します。

  2. 次の引数を指定して StockTradeWriter クラスを実行します。

    StockTradeStream us-west-2

    us-west-2 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

次のような出力が表示されます。

Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 8: SELL 996 shares of BUD for $124.18 Feb 16, 2015 3:53:00 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 9: BUY 159 shares of GE for $20.85 Feb 16, 2015 3:53:01 PM com.amazonaws.services.kinesis.samples.stocktrades.writer.StockTradesWriter sendStockTrade INFO: Putting trade: ID 10: BUY 322 shares of WMT for $90.08

Kinesis Data Streams によって株式取引ストリームが取り込まれます。

次のステップ

ステップ 5: コンシューマーを実装する

このページの内容: