处理分页结果:扫描和查询 - AWS SDK for Java 2.x

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

处理分页结果:扫描和查询

DynamoDB 增强型客户端 API 的 scanquerybatch 方法返回包含一个或多个页面 的响应。一个页面包含一个或多个项目。您的代码可以按页处理响应,也可以处理单个项目。

同步DynamoDbEnhancedClient客户端返回的分页响应返回一个PageIterable对象,而异步客户端返回的响应DynamoDbEnhancedAsyncClient返回一个PagePublisher对象。

本节介绍如何处理分页结果,并提供使用扫描和查询 APIs的示例。

扫描表

SDK 的 scan 方法对应于同名的 DynamoDB 操作。DynamoDB 增强型客户端 API 提供了相同的选项,但它使用熟悉的对象模型并为您处理分页。

首先,我们通过查看同步映射类的scan方法来探索PageIterable接口DynamoDbTable

使用同步 API

以下示例演示使用表达式筛选返回项的 scan 方法。ProductCatalog是前面显示的模型对象。

在评论行 2 之后显示的筛选表达式将退回的ProductCatalog商品限制为价格在 8.00 到 80.00 之间(含)的商品。

此示例还使用注释行 1 后面显示的attributesToProject方法排除这些isbn值。

在注释第 3 行之后pagedResultsscan方法返回PageIterable对象。PageIterablestream 方法返回一个 java.util.Stream 对象,您可以使用该对象来处理页面。在此示例中,计算并记录了页数。

从注释行 4 开始,该示例演示访问 ProductCatalog 项目的两种变体。注释行 4a 之后的版本通过每个页面进行流式传输,并对每页上的项目进行排序和记录。注释行 4b 之后的版本会跳过页面迭代并直接访问项目。

由于 PageIterable 接口有两个父接口(java.lang.IterableSdkIterable),因此提供了多种处理结果的方。Iterable 引入了 forEachiteratorspliterator 方法,而 SdkIterable 引入了 stream 方法。

public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }

使用异步 API

异步 scan 方法将结果作为 PagePublisher 对象返回。PagePublisher 接口有两种 subscribe 方法可用于处理响应页面。一种 subscribe 方法来自 org.reactivestreams.Publisher 父接口。要使用第一个选项处理页面,请向 subscribe 方法传递一个 Subscriber 实例。接下来的第一个示例演示了 subscribe 方法的用法。

第二种subscribe方法来自接SdkPublisher口。此版本的 subscribe 接受 Consumer 而不是 Subscriber。此 subscribe 方法变体如接下来的第二个示例所示。

以下示例演示 scan 方法的异步版本,该版本使用了上一个示例中的相同筛选表达式。

在注释行 3 之后,DynamoDbAsyncTable.scan 返回一个 PagePublisher 对象。在下一行,代码创建一个 org.reactivestreams.Subscriber 接口实例 ProductCatalogSubscriber,在注释行 4 之后,该实例订阅到 PagePublisher

ProductCatalogSubscriber 类示例的注释行 8 之后,Subscriber 对象从 onNext 方法中的每个页面收集 ProductCatalog 项目。这些项目存储在私有 List 变量中,并在调用代码中使用 ProductCatalogSubscriber.getSubscribedItems() 方法对它们进行访问。这是在注释行 5 之后调用的。

检索了列表后,代码按价格对所有 ProductCatalog 项目进行排序并记录每个项目。

ProductCatalogSubscriberCountDownLatch中的会阻塞调用线程,直到所有项目都已添加到列表中,然后在注释行 5 之后继续。

public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }

以下代码段示例使用的 PagePublisher.subscribe 方法版本在注释行 6 之后接受 Consumer。Java lambda 参数使用页面,进一步处理每个项目。在此示例中,对每个页面进行处理,并对每页上的项目进行排序和记录。

// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

PagePublisheritems 方法对模型实例进行解包,以便您的代码可以直接处理这些项目。以下代码段演示了这种方法。

// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

查询表

您可以使用 DynamoDB 增强版客户端来查询您的表并检索多个符合特定条件的项目。该query()方法使用在数据类上定义的可选注解@DynamoDbPartitionKey和可选@DynamoDbSortKey注解,根据主键值查找项目。

query()方法需要分区键值,并且可以选择接受排序键条件以进一步优化结果。与 scan API 一样,PageIterable对于同步调用,查询会返回 a,PagePublisher对于异步调用,则返回 a。

Query 方法示例

下面的 query() 方法代码示例使用 MovieActor 类。数据类定义了一个复合主键,该主键由作为分区键的movie属性以及作为排序键的actor属性组成。

package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }

对以下项目进行查询之后的代码示例。

MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}

以下代码定义了两个QueryConditional实例:keyEqual(在注释行 1 之后)和sortGreaterThanOrEqualTo(在注释行 1a 之后)。

按分区键查询项目

keyEqual实例匹配分区键值为的项目movie01

此示例还在注释行 2 之后定义了一个筛选表达式,用于过滤掉任何没有actingschoolname值的项目。

QueryEnhancedRequest组合了查询的关键条件和筛选表达式。

public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query using the "keyEqual" conditional and filter expression. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );
例 — 使用条件keyEqual查询输出

下面是运行该方法的输出。该输出显示 movieName 值为 movie01 的项目,不显示 actingSchoolName 等于 null 的项目。

2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}

按分区键和排序键查询项目

通过为sortGreaterThanOrEqualToQueryConditional大于或等于 actor2 的值添加排序键条件来细化分区键匹配 (movie01)。

QueryConditional以开头的@@ 方法sort需要分区键值才能匹配,并通过基于排序键值的比较来进一步完善查询。 Sort在方法名称中并不意味着结果已排序,而是将使用排序键值进行比较。

在以下代码段中,我们更改了之前在注释行 3 之后显示的查询请求。此片段将 “keyEqual” 条件查询替换为注释行 1a 之后定义的 sortGreaterThan OrEqualTo “” 查询条件。以下代码还删除了筛选表达式。

QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo).build();
例 — 使用条件sortGreaterThanOrEqualTo查询输出

以下输出显示了查询的结果。该查询仅返回 movieName 值等于 movie01actorName 值大于或等于 actor2 的项目。由于我们移除了过滤器,因此查询返回的项目没有该actingSchoolName属性值。

2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}