Amazon Kinesis Data Streams
開発者ガイド

KPL を使用した Kinesis Data Stream ストリームへの書き込み

以下のセクションでは、最もシンプルな「最低限」のプロデューサーから完全に非同期なコードまで順にサンプルコードを示します。

最低限のプロデューサーコード

次のコードは、最小限の機能するプロデューサーを書くために必要なものがすべて含まれています。Kinesis Producer Library (KPL) ユーザーレコードはバックグラウンドで処理されます。

// KinesisProducer gets credentials automatically like // DefaultAWSCredentialsProviderChain. // It also gets region automatically from the EC2 metadata service. KinesisProducer kinesis = new KinesisProducer(); // Put some records for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block kinesis.addUserRecord("myStream", "myPartitionKey", data); } // Do other stuff ...

結果に対する同期的な応答

前のコード例では、KPL ユーザーレコードが成功したかどうかをチェックしませんでした。KPL は、失敗に対処するために必要な再試行を実行します。ただし、結果を確認する必要がある場合には、次の例 (わかりやすくするため前の例を使用しています) のように、addUserRecord から返される Future オブジェクトを使用して結果を確認します。

KinesisProducer kinesis = new KinesisProducer(); // Put some records and save the Futures List<Future<UserRecordResult>> putFutures = new LinkedList<Future<UserRecordResult>>(); for (int i = 0; i < 100; i++) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); // doesn't block putFutures.add( kinesis.addUserRecord("myStream", "myPartitionKey", data)); } // Wait for puts to finish and check the results for (Future<UserRecordResult> f : putFutures) { UserRecordResult result = f.get(); // this does block if (result.isSuccess()) { System.out.println("Put record into shard " + result.getShardId()); } else { for (Attempt attempt : result.getAttempts()) { // Analyze and respond to the failure } } }

結果に対する非同期的な応答

前の例では、Future オブジェクトに対して get() を呼び出しているため、実行がブロックされます。実行のブロックを避ける必要がある場合には、次の例に示すように非同期コールバックを使用できます。

KinesisProducer kinesis = new KinesisProducer(); FutureCallback<UserRecordResult> myCallback = new FutureCallback<UserRecordResult>() { @Override public void onFailure(Throwable t) { /* Analyze and respond to the failure */ }; @Override public void onSuccess(UserRecordResult result) { /* Respond to the success */ }; }; for (int i = 0; i < 100; ++i) { ByteBuffer data = ByteBuffer.wrap("myData".getBytes("UTF-8")); ListenableFuture<UserRecordResult> f = kinesis.addUserRecord("myStream", "myPartitionKey", data); // If the Future is complete by the time we call addCallback, the callback will be invoked immediately. Futures.addCallback(f, myCallback); }