AWS SDK for Java 2.x を使用した非同期プログラミング
AWS SDK for Java 2.x は、少数のスレッドで同時実行を実現する、ノンブロッキング I/O をサポートした非同期クライアントを備えています。ただし、ノンブロッキング I/O の合計は保証されません。非同期クライアントは、認証情報の取得、AWSSignature Version 4 (SigV4) を使用したリクエスト署名、エンドポイント検出などのケースで、ブロック呼び出しを実行する場合があります。
同期メソッドは、クライアントがサービスからのレスポンスを受信するまでスレッドの実行をブロックします。非同期メソッドはすぐに応答を返し、レスポンスを待機せずに呼び出しスレッドに制御を戻します。
非同期メソッドはレスポンスが可能になる前に応答を返すため、準備ができたらレスポンスを得るための手段が必要になります。AWS SDK for Java の 2.x における非同期クライアントのメソッドでは、準備ができるとレスポンスにアクセスすることを許可する CompletableFuture オブジェクトが返されます。
非同期クライアント API の使用
非同期クライアントメソッドの署名は同期メソッドと同じですが、非同期メソッドは、将来の非同期オペレーションの結果を含む CompletableFutureCompletionException としてスローされます。
結果を取得するために使用できる方法の 1 つは、SDK メソッド呼び出しによって返された CompletableFuture に whenComplete() メソッドを連結させることです。whenComplete() メソッドは、非同期呼び出しの完了方法に応じて、結果または CompletionException タイプのスロー可能なオブジェクトを受け取ります。呼び出し元のコードに返される前に結果を処理またはチェックするために、アクションを whenComplete() に指定します。
SDK メソッドによって返されたオブジェクト以外のものを返す場合は、代わりに handle() メソッドを使用します。handle() メソッドは whenComplete() と同じパラメータを受け取りますが、結果を処理してオブジェクトを返すことができます。
非同期チェーンが完了するのを待ち、完了結果を取得するには、 join() メソッドを呼び出します。Throwable オブジェクトがチェーンで処理されなかった場合、 join() メソッドは元の例外をラップするチェックされていない CompletionException をスローします。元の例外には CompletionException#getCause() でアクセスします。CompletableFuture#get() メソッドを呼び出して、完了結果を取得することもできます。ただし、 get() メソッドはチェック例外をスローできます。
次の例は、DynamoDB 非同期クライアントの listTables() メソッドを使用する 2 つの異なる方法を示しています。whenComplete() に渡されたアクションは成功したレスポンスをログに記録するだけですが、handle() バージョンはテーブル名のリストを抽出してリストを返します。どちらの場合も、非同期チェーンでエラーが生成されるとエラーは再スローされるため、クライアントコードによって処理が可能です。
インポート
import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import java.util.List; import java.util.concurrent.CompletableFuture;
コード
非同期メソッドでのストリーミングの処理
ストリーミングコンテンツを扱う非同期メソッドでは、コンテンツを増分で渡すには AsyncRequestBody
次の例では、非同期形式の PutObject オペレーションを使用して、ファイルを Amazon S3 に非同期的にアップロードします。
インポート
import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
コード
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncOps <bucketName> <key> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " key - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String key = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); PutObjectRequest objectRequest = PutObjectRequest.builder() .bucket(bucketName) .key(key) .build(); // Put the object into the bucket CompletableFuture<PutObjectResponse> future = client.putObject(objectRequest, AsyncRequestBody.fromFile(Paths.get(path)) ); future.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object uploaded. Details: " + resp); } else { // Handle error err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); future.join(); } }
次の例では、非同期形式の GetObject オペレーションを使用して、Amazon S3 からファイルを取得します。
インポート
import software.amazon.awssdk.core.async.AsyncResponseTransformer; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import java.nio.file.Paths; import java.util.concurrent.CompletableFuture;
コード
/** * To run this AWS code example, ensure that you have setup your development environment, including your AWS credentials. * * For information, see this documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class S3AsyncStreamOps { public static void main(String[] args) { final String USAGE = "\n" + "Usage:\n" + " S3AsyncStreamOps <bucketName> <objectKey> <path>\n\n" + "Where:\n" + " bucketName - the name of the Amazon S3 bucket (for example, bucket1). \n\n" + " objectKey - the name of the object (for example, book.pdf). \n" + " path - the local path to the file (for example, C:/AWS/book.pdf). \n" ; if (args.length != 3) { System.out.println(USAGE); System.exit(1); } String bucketName = args[0]; String objectKey = args[1]; String path = args[2]; Region region = Region.US_WEST_2; S3AsyncClient client = S3AsyncClient.builder() .region(region) .build(); GetObjectRequest objectRequest = GetObjectRequest.builder() .bucket(bucketName) .key(objectKey) .build(); CompletableFuture<GetObjectResponse> futureGet = client.getObject(objectRequest, AsyncResponseTransformer.toFile(Paths.get(path))); futureGet.whenComplete((resp, err) -> { try { if (resp != null) { System.out.println("Object downloaded. Details: "+resp); } else { err.printStackTrace(); } } finally { // Only close the client when you are completely done with it client.close(); } }); futureGet.join(); } }
高度な非同期オプションを設定する
AWS SDK for Java 2.x は、非同期イベント駆動型ネットワークアプリケーションフレームワークである NettyExecutorService を作成し、HTTP クライアントリクエストから Netty クライアントに返された Future を完了します。この抽象化により、開発者がスレッドを停止またはスリープすることを選択した場合、アプリケーションが非同期プロセスをブレークするリスクが軽減されます。デフォルトでは、各非同期クライアントはプロセッサ数に基づいてスレッドプールを作成し、ExecutorService 内のキュー内のタスクを管理します。
非同期クライアントを構築する際に、ExecutorService の特定の JDK 実装を指定できます。次のスニペットは、固定のスレッド数を持つ ExecutorService を作成します。
コード
S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, Executors.newFixedThreadPool(10) ) ) .build();
パフォーマンスを最適化するには、独自のスレッドプールエグゼキュターを管理し、クライアントの設定時に含めます。
ThreadPoolExecutor executor = new ThreadPoolExecutor(50, 50, 10, TimeUnit.SECONDS, new LinkedBlockingQueue<>(<custom_value>), new ThreadFactoryBuilder() .threadNamePrefix("sdk-async-response").build()); // Allow idle core threads to time out executor.allowCoreThreadTimeOut(true); S3AsyncClient clientThread = S3AsyncClient.builder() .asyncConfiguration( b -> b.advancedOption(SdkAdvancedAsyncClientOption .FUTURE_COMPLETION_EXECUTOR, executor ) ) .build();