扫描表 - AWS SDK for Java 2.x

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

扫描表

SDK 的scan方法对应于同名的 DynamoDB 操作。DynamoDB 增强型客户端 API 提供相同的选项,但它使用熟悉的对象模型并为您处理分页。

首先,我们通过查看同步映射类的scan方法来探索PageIterable接口DynamoDbTable

使用同步 API

以下示例显示了使用表达式筛选返回项scan的方法。ProductCatalog是前面显示的模型对象。

注释行 1 后显示的筛选表达式将返回的ProductCatalog商品限制为价格值介于 8.00 到 80.00 之间的商品。

此示例还使用注释行 2 后显示的attributesToProject方法排除了这些isbn值。

在注释第 3 行pagedResult,该scan方法返回PageIterable对象。的stream方法PageIterable返回一个java.util.Stream对象,您可以使用该对象来处理页面。在此示例中,对页数进行了计数和记录。

从评论第 4 行开始,该示例显示了访问ProductCatalog项目的两种变体。评论行 2a 之后的版本会流式传输每个页面,并对每个页面上的项目进行排序和记录。注释行 2b 之后的版本会跳过页面迭代直接访问项目。

PageIterable接口提供了多种处理结果的方法,因为它有两个父接口—— java.lang.IterableSdkIterableIterable带来forEachiteratorspliterator方法,并SdkIterable带来stream方法。

public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }

使用异步 API

异步scan方法将结果作为PagePublisher对象返回。该PagePublisher接口有两种subscribe方法可用于处理响应页面。一种subscribe方法来自org.reactivestreams.Publisher父接口。要使用第一个选项处理页面,请将Subscriber实例传递给该subscribe方法。以下第一个示例显示了subscribe方法的用法。

第二种subscribe方法来自接SdkPublisher口。此版本接subscribe受 a Consumer而不是 a Subscriber。此subscribe方法变体如以下第二个示例所示。

以下示例显示了该scan方法的异步版本,该版本使用的过滤器表达式与前一个示例中所示的过滤器表达式相同。

在注释第 3 行之后,DynamoDbAsyncTable.scan返回一个PagePublisher对象。在下一行,代码创建了org.reactivestreams.Subscriber接口的实例,该实例订阅了PagePublisher后面的注释第 4 行。ProductCatalogSubscriber

Subscriber对象在ProductCatalogSubscriber类示例的注释第 8 行之后的onNext方法中从每个页面收集ProductCatalog项目。这些项目存储在私有List变量中,并通过ProductCatalogSubscriber.getSubscribedItems()方法在调用代码中进行访问。这是在注释第 5 行之后调用的。

检索列表后,代码按价格对所有ProductCatalog商品进行排序并记录每件商品。

ProductCatalogSubscriberCountDownLatch中的会阻塞调用线程,直到将所有项目添加到列表中,然后在注释第 5 行之后继续。

public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }

以下代码段示例使用了接受第 6 行Consumer后注释的PagePublisher.subscribe方法版本。Java lambda 参数消耗页面,这些页面会进一步处理每个项目。在此示例中,处理每个页面,对每个页面上的项目进行排序然后记录。

// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

PagePublisher解包模型实例的items方法这样你的代码就可以直接处理这些项目。这种方法如以下片段所示。

// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.