使用 AWS SDK for Java 2.x 处理分页结果 - AWS SDK for Java 2.x

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 AWS SDK for Java 2.x 处理分页结果

当响应对象太大而无法在单个响应中返回时,许多 AWS 操作都会返回分页结果。在 AWS SDK for Java 1.0 中,响应包含一个用于检索下一页结果的标记。相比之下, AWS SDK for Java 2.x 具有自动分页方法,可以进行多次服务调用,自动为您获取下一页的结果。您只需编写处理结果的代码。自动分页功能适用于同步和异步客户端。

注意

这些代码片段假设您了解使用的基础知识SDK,并且已将您的环境配置为单点登录访问权限

同步分页

以下示例演示列出 Amazon S3 桶中对象的同步分页方法。

迭代页面

第一个示例演示了如何使用分listRes页器对象(一个ListObjectsV2Iterable实例)来使用该方法遍历所有响应页面。stream代码在响应页面上进行流式传输,将响应流转换为S3Object内容流,然后处理 Amazon S3 对象的内容。

以下导入适用于此同步分页部分中的所有示例。

import java.io.IOException; import java.nio.ByteBuffer; import java.util.Random; import software.amazon.awssdk.core.waiters.WaiterResponse; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.s3.S3Client; import software.amazon.awssdk.services.s3.paginators.ListObjectsV2Iterable; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.services.s3.model.S3Exception; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.ListObjectsV2Request; import software.amazon.awssdk.services.s3.model.ListObjectsV2Response; import software.amazon.awssdk.services.s3.model.S3Object; import software.amazon.awssdk.services.s3.model.GetObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteObjectRequest; import software.amazon.awssdk.services.s3.model.DeleteBucketRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.CreateMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompletedMultipartUpload; import software.amazon.awssdk.services.s3.model.CreateBucketRequest; import software.amazon.awssdk.services.s3.model.CompletedPart; import software.amazon.awssdk.services.s3.model.CreateBucketConfiguration; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; import software.amazon.awssdk.services.s3.waiters.S3Waiter; import software.amazon.awssdk.services.s3.model.HeadBucketRequest; import software.amazon.awssdk.services.s3.model.HeadBucketResponse;
ListObjectsV2Request listReq = ListObjectsV2Request.builder() .bucket(bucketName) .maxKeys(1) .build(); ListObjectsV2Iterable listRes = s3.listObjectsV2Paginator(listReq); // Process response pages listRes.stream() .flatMap(r -> r.contents().stream()) .forEach(content -> System.out .println(" Key: " + content.key() + " size = " + content.size()));

请参阅上的完整示例 GitHub。

迭代对象

以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。ListObjectsV2Iterable 类的 contents 方法返回一个 SdkIterable,它提供了几种处理底层内容元素的方法。

使用流

以下代码段在响应内容上使用 stream 方法来迭代分页项目集合。

// Helper method to work with paginated collection of items directly. listRes.contents().stream() .forEach(content -> System.out .println(" Key: " + content.key() + " size = " + content.size()));

请参阅上的完整示例 GitHub。

使用 for-each 循环

由于 SdkIterable 扩展了 Iterable 接口,因此您可以像处理任何 Iterable 一样处理内容。以下代码段使用标准 for-each 循环迭代响应的内容。

for (S3Object content : listRes.contents()) { System.out.println(" Key: " + content.key() + " size = " + content.size()); }

请参阅上的完整示例 GitHub。

手动分页

如果您的使用案例需要手动分页,则手动分页仍然可用。对后续请求使用响应对象中的下一个令牌。以下示例使用 while 循环。

ListObjectsV2Request listObjectsReqManual = ListObjectsV2Request.builder() .bucket(bucketName) .maxKeys(1) .build(); boolean done = false; while (!done) { ListObjectsV2Response listObjResponse = s3.listObjectsV2(listObjectsReqManual); for (S3Object content : listObjResponse.contents()) { System.out.println(content.key()); } if (listObjResponse.nextContinuationToken() == null) { done = true; } listObjectsReqManual = listObjectsReqManual.toBuilder() .continuationToken(listObjResponse.nextContinuationToken()) .build(); }

请参阅上的完整示例 GitHub。

异步分页

以下示例演示了列出 DynamoDB 表格的异步分页方法。

迭代表名称页面

以下两个示例使用异步 DynamoDB 客户端,该客户端调用listTablesPaginator该方法并请求获取。ListTablesPublisher ListTablesPublisher实现了两个接口,这为处理响应提供了许多选项。我们将研究每个接口的方法。

使用 Subscriber

以下代码示例演示如何使用 ListTablesPublisher 实现的 org.reactivestreams.Publisher 接口处理分页结果。要了解有关响应式流模型的更多信息,请参阅 React ive St GitHub reams 存储库

以下导入适用于此异步分页部分中的所有示例。

import io.reactivex.rxjava3.core.Flowable; import org.reactivestreams.Subscriber; import org.reactivestreams.Subscription; import reactor.core.publisher.Flux; import software.amazon.awssdk.core.async.SdkPublisher; import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient; import software.amazon.awssdk.services.dynamodb.model.ListTablesRequest; import software.amazon.awssdk.services.dynamodb.model.ListTablesResponse; import software.amazon.awssdk.services.dynamodb.paginators.ListTablesPublisher; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException;

以下代码获取一个 ListTablesPublisher 实例。

// Creates a default client with credentials and region loaded from the // environment. final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build(); ListTablesPublisher publisher = asyncClient.listTablesPaginator(listTablesRequest);

以下代码使用 org.reactivestreams.Subscriber 的匿名实现来处理每个页面的结果。

onSubscribe 方法将调用 Subscription.request 方法来对来自发布者的数据启动请求。必须调用此方法以开始从发布者获取数据。

订阅者的 onNext 方法将处理响应页面,它会访问所有表名称并打印出每个表名称。处理完该页面后,会向发布者请求另一个页面。将重复调用该方法,直到检索了所有页面。

如果检索数据时出现错误,将触发 onError 方法。最后,在 onComplete 方法在请求所有页面后调用。

// A Subscription represents a one-to-one life-cycle of a Subscriber subscribing // to a Publisher. publisher.subscribe(new Subscriber<ListTablesResponse>() { // Maintain a reference to the subscription object, which is required to request // data from the publisher. private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; // Request method should be called to demand data. Here we request a single // page. subscription.request(1); } @Override public void onNext(ListTablesResponse response) { response.tableNames().forEach(System.out::println); // After you process the current page, call the request method to signal that // you are ready for next page. subscription.request(1); } @Override public void onError(Throwable t) { // Called when an error has occurred while processing the requests. } @Override public void onComplete() { // This indicates all the results are delivered and there are no more pages // left. } });

请参阅上的完整示例 GitHub。

使用 Consumer

ListTablesPublisher 实现的 SdkPublisher 接口有一个 subscribe 方法,该方法接受 Consumer 并返回 CompletableFuture<Void>

此接口中的 subscribe 方法可用于 org.reactivestreams.Subscriber 开销可能过大的简单用例。当下面的代码使用每个页面时,它会在每个页面上调用 tableNames 方法。该 tableNames 方法返回使用 forEach 方法处理的 DynamoDB 表名的 java.util.List

// Use a Consumer for simple use cases. CompletableFuture<Void> future = publisher.subscribe( response -> response.tableNames() .forEach(System.out::println));

请参阅上的完整示例 GitHub。

迭代表名称

以下示例演示了迭代响应中返回的对象(而不是响应的页面)的方法。与之前用 contents 方法演示的同步 Amazon S3 示例类似,DynamoDB 异步结果类 ListTablesPublisher 具有与底层项目集合交互的 tableNames 便捷方法。此 tableNames 方法的返回类型是一个 SdkPublisher,可用于跨所有页面请求项目。

使用 Subscriber

以下代码获取表名底层集合的 SdkPublisher

// Create a default client with credentials and region loaded from the // environment. final DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesRequest listTablesRequest = ListTablesRequest.builder().limit(3).build(); ListTablesPublisher listTablesPublisher = asyncClient.listTablesPaginator(listTablesRequest); SdkPublisher<String> publisher = listTablesPublisher.tableNames();

以下代码使用 org.reactivestreams.Subscriber 的匿名实现来处理每个页面的结果。

订阅者的 onNext 方法将处理集合中的单个元素。在本例中,它是一个表名称。处理完该表名称后,会向发布者请求另一个表名称。将重复调用该方法,直到检索了所有表名称。

// Use a Subscriber. publisher.subscribe(new Subscriber<String>() { private Subscription subscription; @Override public void onSubscribe(Subscription s) { subscription = s; subscription.request(1); } @Override public void onNext(String tableName) { System.out.println(tableName); subscription.request(1); } @Override public void onError(Throwable t) { } @Override public void onComplete() { } });

请参阅上的完整示例 GitHub。

使用 Consumer

以下示例使用 SdkPublishersubscribe 方法(采用 Consumer)来处理每个项目。

// Use a Consumer. CompletableFuture<Void> future = publisher.subscribe(System.out::println); future.get();

请参阅上的完整示例 GitHub。

使用第三方库

您可以使用其他第三方库,而不是实现自定义订阅者。此示例演示了的用法 RxJava,但是可以使用任何实现响应式流接口的库。有关该库的更多信息, GitHub请参阅上的 RxJava wiki 页面

要使用该库,请将其作为依赖项添加。如果使用 Maven,则该示例将显示要使用的POM代码片段。

POM参赛作品

<dependency> <groupId>io.reactivex.rxjava3</groupId> <artifactId>rxjava</artifactId> <version>3.1.6</version> </dependency>

代码

DynamoDbAsyncClient asyncClient = DynamoDbAsyncClient.create(); ListTablesPublisher publisher = asyncClient.listTablesPaginator(ListTablesRequest.builder() .build()); // The Flowable class has many helper methods that work with // an implementation of an org.reactivestreams.Publisher. List<String> tables = Flowable.fromPublisher(publisher) .flatMapIterable(ListTablesResponse::tableNames) .toList() .blockingGet(); System.out.println(tables);

请参阅上的完整示例 GitHub。