本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
处理分页结果:扫描和查询
DynamoDB 增强型客户端 API 的 scan
、query
和 batch
方法返回包含一个或多个页面 的响应。一个页面包含一个或多个项目。您的代码可以按页处理响应,也可以处理单个项目。
同步DynamoDbEnhancedClient
客户端返回的分页响应返回一个PageIterableDynamoDbEnhancedAsyncClient
返回一个PagePublisher
本节介绍如何处理分页结果,并提供使用扫描和查询 APIs的示例。
扫描表
SDK 的 scan
首先,我们通过查看同步映射类的scan
方法来探索PageIterable
接口DynamoDbTable
使用同步 API
以下示例演示使用表达式scan
方法。ProductCatalog是前面显示的模型对象。
在评论行 2 之后显示的筛选表达式将退回的ProductCatalog
商品限制为价格在 8.00 到 80.00 之间(含)的商品。
此示例还使用注释行 1 后面显示的attributesToProject
方法排除这些isbn
值。
在注释第 3 行之后pagedResults
,scan
方法返回PageIterable
对象。PageIterable
的 stream
方法返回一个 java.util.Stream
从注释行 4 开始,该示例演示访问 ProductCatalog
项目的两种变体。注释行 4a 之后的版本通过每个页面进行流式传输,并对每页上的项目进行排序和记录。注释行 4b 之后的版本会跳过页面迭代并直接访问项目。
由于 PageIterable
接口有两个父接口(java.lang.Iterable
SdkIterable
Iterable
引入了 forEach
、iterator
和 spliterator
方法,而 SdkIterable
引入了 stream
方法。
public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }
使用异步 API
异步 scan
方法将结果作为 PagePublisher
对象返回。PagePublisher
接口有两种 subscribe
方法可用于处理响应页面。一种 subscribe
方法来自 org.reactivestreams.Publisher
父接口。要使用第一个选项处理页面,请向 subscribe
方法传递一个 Subscriber
实例。接下来的第一个示例演示了 subscribe
方法的用法。
第二种subscribe
方法来自接SdkPublishersubscribe
接受 Consumer
Subscriber
。此 subscribe
方法变体如接下来的第二个示例所示。
以下示例演示 scan
方法的异步版本,该版本使用了上一个示例中的相同筛选表达式。
在注释行 3 之后,DynamoDbAsyncTable.scan
返回一个 PagePublisher
对象。在下一行,代码创建一个 org.reactivestreams.Subscriber
接口实例 ProductCatalogSubscriber
,在注释行 4 之后,该实例订阅到 PagePublisher
。
在 ProductCatalogSubscriber
类示例的注释行 8 之后,Subscriber
对象从 onNext
方法中的每个页面收集 ProductCatalog
项目。这些项目存储在私有 List
变量中,并在调用代码中使用 ProductCatalogSubscriber.getSubscribedItems()
方法对它们进行访问。这是在注释行 5 之后调用的。
检索了列表后,代码按价格对所有 ProductCatalog
项目进行排序并记录每个项目。
ProductCatalogSubscriber
类CountDownLatch
public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }
以下代码段示例使用的 PagePublisher.subscribe
方法版本在注释行 6 之后接受 Consumer
。Java lambda 参数使用页面,进一步处理每个项目。在此示例中,对每个页面进行处理,并对每页上的项目进行排序和记录。
// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
PagePublisher
的 items
方法对模型实例进行解包,以便您的代码可以直接处理这些项目。以下代码段演示了这种方法。
// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
查询表
您可以使用 DynamoDB 增强版客户端来查询您的表并检索多个符合特定条件的项目。该query()
@DynamoDbPartitionKey
和可选@DynamoDbSortKey
注解,根据主键值查找项目。
该query()
方法需要分区键值,并且可以选择接受排序键条件以进一步优化结果。与 scan
API 一样,PageIterable
对于同步调用,查询会返回 a,PagePublisher
对于异步调用,则返回 a。
Query
方法示例
下面的 query()
方法代码示例使用 MovieActor
类。数据类定义了一个复合主键,该主键由作为分区键的movie
属性以及作为排序键的actor
属性组成。
package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }
对以下项目进行查询之后的代码示例。
MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}
以下代码定义了两个QueryConditional
实例:keyEqual
(在注释行 1 之后)和sortGreaterThanOrEqualTo
(在注释行 1a 之后)。
按分区键查询项目
该keyEqual
实例匹配分区键值为的项目movie01
。
此示例还在注释行 2 之后定义了一个筛选表达式,用于过滤掉任何没有actingschoolname
值的项目。
QueryEnhancedRequest
组合了查询的关键条件和筛选表达式。
public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query using the "keyEqual" conditional and filter expression. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );
例 — 使用条件keyEqual
查询输出
下面是运行该方法的输出。该输出显示 movieName
值为 movie01 的项目,不显示 actingSchoolName
等于 null
的项目。
2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}
按分区键和排序键查询项目
通过为sortGreaterThanOrEqualTo
QueryConditional
大于或等于 actor2 的值添加排序键条件来细化分区键匹配 (movie01)。
QueryConditional
以开头的@@ 方法sort
需要分区键值才能匹配,并通过基于排序键值的比较来进一步完善查询。 Sort
在方法名称中并不意味着结果已排序,而是将使用排序键值进行比较。
在以下代码段中,我们更改了之前在注释行 3 之后显示的查询请求。此片段将 “keyEqual” 条件查询替换为注释行 1a 之后定义的 sortGreaterThan OrEqualTo “” 查询条件。以下代码还删除了筛选表达式。
QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo).build();
例 — 使用条件sortGreaterThanOrEqualTo
查询输出
以下输出显示了查询的结果。该查询仅返回 movieName
值等于 movie01 且 actorName
值大于或等于 actor2 的项目。由于我们移除了过滤器,因此查询返回的项目没有该actingSchoolName
属性值。
2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}