コンシューマーを実装する - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

コンシューマーを実装する

このチュートリアルのコンシューマーアプリケーションは、データストリームの株式取引を継続的に処理します。その後、1 分ごとに売買されている最も人気のある株式を出力します。アプリケーションは Kinesis Client Library (KCL) 上に構築されており、コンシューマーアプリケーションに共通する面倒な作業の多くを行います。詳細については、「Kinesis Client Library を使用する」を参照してください。

ソースコードを参照し、次の情報を確認してください。

StockTradesProcessor クラス

以下のタスクを実行する、ユーザーに提供されるコンシューマーのメインクラス。

  • 引数として渡されたアプリケーション、データストリーム、およびリージョン名を読み取ります。

  • リージョン名でKinesisAsyncClientインスタンスを作成します。

  • ShardRecordProcessor のインスタンスとして機能し、StockTradeRecordProcessor インスタンスによって実装される、StockTradeRecordProcessorFactory インスタンスを作成します。

  • KinesisAsyncClient、、および ConfigsBuilderインスタンスを使用してStreamNameApplicationNameStockTradeRecordProcessorFactoryインスタンスを作成します。これは、デフォルト値ですべての設定を作成するのに役立ちます。

  • ConfigsBuilder インスタンスでKCLスケジューラ (以前は、KCLバージョン 1.x ではKCLワーカーと呼ばれていました) を作成します。

  • このスケジューラーは、(このコンシューマーインスタンスに割り当てられた) 各シャードに新しいスレッドを作成します。これにより、継続的にデータストリームからレコードが読み取られます。次に、StockTradeRecordProcessor インスタンスを呼び出して、受信したレコードのバッチを処理します。

StockTradeRecordProcessor クラス

StockTradeRecordProcessor インスタンスを実装したら、次は initializeprocessRecordsleaseLostshardEndedshutdownRequested の 5 つの必須メソッドを実装します。

initialize および shutdownRequestedメソッドは、レコードの受信を開始する準備ができたタイミングと、レコードの受信を停止するタイミングをそれぞれレコードプロセッサに通知KCLするために によって使用されます。これにより、アプリケーション固有のセットアップタスクと終了タスクを実行できます。 leaseLostおよび shardEndedは、リースが失われたり、処理がシャードの最後に達したりした場合に何をすべきかを示すロジックを実装するために使用されます。この例では、これらのイベントを示すメッセージをログに記録するだけです。

これらのメソッドのコードを示しています。主な処理は processRecords メソッドで行われ、そこでは各レコードの processRecord が使用されます。後者のメソッドは、ほとんどの場合、空のスケルトンコードとして提供されます。次のステップでは、これを実装する方法について説明します。詳細については、次のステップを参照してください。

また、processRecord のサポートメソッドである reportStats および resetStats の実装にも注目してください。これらのメソッドは、元のソースコードでは空になっています。

processRecords メソッドは既に実装されており、次のステップを実行します。

  • 渡されたレコードごとに processRecord を呼び出します。

  • 最後のレポートから 1 分間以上経過した場合は、reportStats() を呼び出して最新の統計を出力し、次の間隔に新しいレコードのみ含まれるように resetStats() を呼び出して統計を消去します。

  • 次のレポート時間を設定します。

  • 最後のチェックポイントから 1 分間以上経過した場合は、checkpoint() を呼び出します。

  • 次のチェックポイント時間を設定します。

このメソッドでは、60 秒間間隔でレポートおよびチェックポイント時間が設定されています。チェックポイントの詳細については、Kinesis Client Library の使用を参照してください。

StockStats クラス

このクラスでは、データを保持し、最も人気のある株式の経時的な統計を示すことができます。このコードは、事前に用意されており、次のメソッドが含まれています。

  • addStockTrade(StockTrade): 指定された StockTrade を実行中の統計に取り込みます。

  • toString(): 特定の形式の文字列として統計を返します。

このクラスは、各株式の取引の合計数と最大数を累計して、最も人気のある株式を追跡します。これらの数は、株式取引を受け取る度に更新されます。

次のステップに示されているコードを StockTradeRecordProcessor クラスのメソッドに追加します。

コンシューマーを実装するには
  1. processRecord メソッドを実装するには、サイズの正しい StockTrade オブジェクトを開始し、それにレコードデータを追加します。また、問題が発生した場合に警告がログに記録されるようにします。

    byte[] arr = new byte[record.data().remaining()]; record.data().get(arr); StockTrade trade = StockTrade.fromJsonAsBytes(arr); if (trade == null) { log.warn("Skipping record. Unable to parse record into StockTrade. Partition Key: " + record.partitionKey()); return; } stockStats.addStockTrade(trade);
  2. reportStats メソッドを実装します。好みに合わせて出力形式を変更します。

    System.out.println("****** Shard " + kinesisShardId + " stats for last 1 minute ******\n" + stockStats + "\n" + "****************************************************************\n");
  3. 新しい resetStats インスタンスを作成する stockStats メソッドを実装します。

    stockStats = new StockStats();
  4. ShardRecordProcessor インターフェイスに必要な以下のメソッドを実装します。

    @Override public void leaseLost(LeaseLostInput leaseLostInput) { log.info("Lost lease, so terminating."); } @Override public void shardEnded(ShardEndedInput shardEndedInput) { try { log.info("Reached shard end checkpointing."); shardEndedInput.checkpointer().checkpoint(); } catch (ShutdownException | InvalidStateException e) { log.error("Exception while checkpointing at shard end. Giving up.", e); } } @Override public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) { log.info("Scheduler is shutting down, checkpointing."); checkpoint(shutdownRequestedInput.checkpointer()); } private void checkpoint(RecordProcessorCheckpointer checkpointer) { log.info("Checkpointing shard " + kinesisShardId); try { checkpointer.checkpoint(); } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). log.info("Caught shutdown exception, skipping checkpoint.", se); } catch (ThrottlingException e) { // Skip checkpoint when throttled. In practice, consider a backoff and retry policy. log.error("Caught throttling exception, skipping checkpoint.", e); } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). log.error("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library.", e); } }
コンシューマーを実行するには
  1. プロデューサーの実装 で記述したプロデューサーを実行し、シミュレートした株式取引レコードをストリームに取り込みます。

  2. 以前に取得したアクセスキーとシークレットキーペア (IAMユーザーの作成時) がファイル に保存されていることを確認します~/.aws/credentials

  3. 次の引数を指定して StockTradesProcessor クラスを実行します。

    StockTradesProcessor StockTradeStream us-west-2

    us-west-2 以外のリージョンにストリームを作成した場合は、代わりにそのリージョンをここで指定する必要があります。

1 分後、次のような出力が表示されます。その後、1 分間ごとに出力が更新されます。

****** Shard shardId-000000000001 stats for last 1 minute ****** Most popular stock being bought: WMT, 27 buys. Most popular stock being sold: PTR, 14 sells. ****************************************************************

次のステップ

(オプション) コンシューマーを拡張する