本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
扫描表
SDK 的scan
首先,我们通过查看同步映射类的scan
方法来探索PageIterable
接口DynamoDbTable
使用同步 API
以下示例显示了使用表达式scan
的方法。ProductCatalog是前面显示的模型对象。
注释行 1 后显示的筛选表达式将返回的ProductCatalog
商品限制为价格值介于 8.00 到 80.00 之间的商品。
此示例还使用注释行 2 后显示的attributesToProject
方法排除了这些isbn
值。
在注释第 3 行pagedResult
,该scan
方法返回PageIterable
对象。的stream
方法PageIterable
返回一个java.util.Stream
从评论第 4 行开始,该示例显示了访问ProductCatalog
项目的两种变体。评论行 2a 之后的版本会流式传输每个页面,并对每个页面上的项目进行排序和记录。注释行 2b 之后的版本会跳过页面迭代直接访问项目。
该PageIterable
接口提供了多种处理结果的方法,因为它有两个父接口—— java.lang.Iterable
SdkIterable
Iterable
带来forEach
、iterator
和spliterator
方法,并SdkIterable
带来stream
方法。
public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }
使用异步 API
异步scan
方法将结果作为PagePublisher
对象返回。该PagePublisher
接口有两种subscribe
方法可用于处理响应页面。一种subscribe
方法来自org.reactivestreams.Publisher
父接口。要使用第一个选项处理页面,请将Subscriber
实例传递给该subscribe
方法。以下第一个示例显示了subscribe
方法的用法。
第二种subscribe
方法来自接SdkPublishersubscribe
受 a Consumer
Subscriber
。此subscribe
方法变体如以下第二个示例所示。
以下示例显示了该scan
方法的异步版本,该版本使用的过滤器表达式与前一个示例中所示的过滤器表达式相同。
在注释第 3 行之后,DynamoDbAsyncTable.scan
返回一个PagePublisher
对象。在下一行,代码创建了org.reactivestreams.Subscriber
接口的实例,该实例订阅了PagePublisher
后面的注释第 4 行。ProductCatalogSubscriber
该Subscriber
对象在ProductCatalogSubscriber
类示例的注释第 8 行之后的onNext
方法中从每个页面收集ProductCatalog
项目。这些项目存储在私有List
变量中,并通过ProductCatalogSubscriber.getSubscribedItems()
方法在调用代码中进行访问。这是在注释第 5 行之后调用的。
检索列表后,代码按价格对所有ProductCatalog
商品进行排序并记录每件商品。
ProductCatalogSubscriber
类CountDownLatch
public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }
以下代码段示例使用了接受第 6 行Consumer
后注释的PagePublisher.subscribe
方法版本。Java lambda 参数消耗页面,这些页面会进一步处理每个项目。在此示例中,处理每个页面,对每个页面上的项目进行排序然后记录。
// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
PagePublisher
解包模型实例的items
方法这样你的代码就可以直接处理这些项目。这种方法如以下片段所示。
// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.