使用分頁結果:掃描和查詢 - AWS SDK for Java 2.x

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用分頁結果:掃描和查詢

DynamoDB 增強型用戶端 API 的querybatch方法會傳回包含一個或多個頁面的回應。scan一個頁面包含一個或多個項目。您的程式碼可以處理每個頁面的回應,也可以處理個別項目。

同步DynamoDbEnhancedClient用戶端傳回的分頁回應會傳回PageIterable物件,而非同步傳回的回應則會傳DynamoDbEnhancedAsyncClientPagePublisher物件。

本節介紹如何處理分頁結果,並提供使用掃描和查詢 API 的範例。

掃描資料表

SDK 的scan方法對應於同名的 DynamoDB 作業。DynamoDB 增強型用戶端 API 提供相同的選項,但它使用熟悉的物件模型並為您處理分頁。

首先,我們通過查看同步映射類的scan方法來探索PageIterable界面,DynamoDbTable.

使用同步 API

下列範例顯示使scan運算式篩選傳回項目的方法。ProductCatalog是之前所示的模型物件。

註解第 2 行後顯示的篩選運算式會限制傳回價格值介於 8.00 到 80.00 之間的ProductCatalog項目。

此範例也會使用註解行 1 之後顯示的attributesToProject方法來排除這isbn些值。

在註解第 3 行之後,PageIterable物件pagedResults,由scan方法傳回。的stream方法PageIterable返回一個java.util.Stream對象,您可以用它來處理頁面。在此範例中,會計算並記錄頁數。

從註解第 4 行開始,範例顯示存取ProductCatalog項目的兩種變化。註釋行 4a 之後的版本流式傳輸每個頁面,並對每個頁面上的項目進行排序和記錄。註釋行 4b 之後的版本會跳過頁面迭代並直接訪問項目。

該接PageIterable口提供了多種處理結果的方法,因為它具有兩個父接口-java.lang.IterableSdkIterableIterable帶來了forEachiteratorspliterator方法,並SdkIterable帶來了stream方法。

public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }

使用非同步 API

非同步scan方法會以PagePublisher物件的形式傳回結果。PagePublisher介面有兩subscribe種方法可用來處理回應頁面。一subscribe種方法來自org.reactivestreams.Publisher父界面。若要使用此第一個選項處理頁面,請將Subscriber執行個體傳遞給subscribe方法。接下來的第一個示例顯示了使用subscribe方法。

第二subscribe種方法來自SdkPublisher界面。這個版本的subscribe接受Consumer而不是Subscriber. 這subscribe種方法的變化示於下面的第二個例子。

下列範例顯示scan方法的非同步版本,該版本使用前一個範例中所示的相同篩選運算式。

在註釋行 3 之後,DynamoDbAsyncTable.scan返回一個PagePublisher對象。在下一行中,程式碼會建立org.reactivestreams.Subscriber介面的執行個體ProductCatalogSubscriber,該執行個體會訂閱註解行 4 PagePublisher 之後。

Subscriber物件會在ProductCatalogSubscriber類別範例中的註解行 8 之後,從onNext方法中的每個頁面收集ProductCatalog項目。這些項目會儲存在 private List 變數中,並且可以使用ProductCatalogSubscriber.getSubscribedItems()方法在呼叫程式碼中存取。這是在註釋行 5 之後調用。

檢索列表後,代碼按價格對所有ProductCatalog項目進行排序,並記錄每個項目。

ProductCatalogSubscriber類別CountDownLatch中的會封鎖呼叫執行緒,直到所有項目都已新增至清單,然後在註解第 5 行之後繼續執行。

public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }

下列程式碼片段範例會使用Consumer在註解行 6 之後接受 a 的PagePublisher.subscribe方法版本。Java lambda 參數會消耗頁面,進一步處理每個項目。在此範例中,會處理每個頁面,並排序每個頁面上的項目,然後記錄。

// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

PagePublisher解散模型實例的items方法,以便您的代碼可以直接處理這些項目。這種方法顯示在下面的代碼片段中。

// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

查詢資料表

DynamoDbTable類別的query()方法會根據主索引鍵值尋找項目。註@DynamoDbPartitionKey釋和可選@DynamoDbSortKey註釋用於定義數據類的主鍵。

此方query()法需要一個分區索引鍵值,該值會尋找符合所提供值的項目。如果您的資料表也定義了排序索引鍵,您可以將其值新增至查詢,做為額外的比較條件,以微調結果。

除了處理結果外,同步和非同步版本的query()工作方式相同。與 API 一PageIterable樣,scanAPI 會傳回同步呼叫的同步呼叫,以及用PagePublisher於非同步呼叫的傳回。query我們討論了使用PageIterablePagePublisher以前在掃描部分。

Query方法範例

接下來的query()方法程式碼範例使用該MovieActor類別。資料類別會定義複合主索引鍵,該索引鍵由分割索引鍵的movie屬性和排序索引鍵的actor屬性組成。

該類還表示它使用名為的全局次要索引acting_award_year。索引的複合主索引鍵是由分割索引鍵的actingaward屬性和排序索引鍵的actingyear屬性所組成。稍後在本主題中,當我們展示如何創建和使用索引時,我們將參考索acting_award_year引。

package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }

跟隨查詢下列項目的程式碼範例。

MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}

下面的代碼定義了兩個QueryConditional實例。 QueryConditionals使用索引鍵值 (單獨使用分區索引鍵或與排序索引鍵組合),並對應至 DynamoDB 服務 API 的關鍵條件運算式。在註解行 1 之後,範例會定義符合分割區值之項目的keyEqual執行個體movie01

此範例也會定義篩選器運算式,篩選掉註解行 2 之後沒有開actingschoolname啟的任何項目。

在註解第 3 行之後,範例會顯示程式碼傳遞給DynamoDbTable.query()方法的QueryEnhancedRequest執行個體。此物件結合了 SDK 用來產生 DynamoDB 服務請求的金鑰條件和篩選器。

public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );

以下是運行該方法的輸出。輸出會顯示movieName值為 movie01 的項目,並且不會顯示actingSchoolName等於的項目。null

2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}

在先前註解行 3 之後顯示的下列查詢要求變體中,程式碼會以註解行 1a 之後定義的取代。keyEqual QueryConditional sortGreaterThanOrEqualTo QueryConditional下列程式碼也會移除篩選運算式。

QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo)

由於此資料表具有複合主索引鍵,因此所有QueryConditional執行個體都需要分割索引鍵值。 QueryConditional以開頭的方法sort...表示需要排序索引鍵。結果不會排序。

下列輸出會顯示查詢的結果。查詢會傳回movieName值等於 movie01 的項目,而且只會傳回actorName值大於或等於 ac tor2 的項目。因為篩選器已移除,所以查詢會傳回沒有actingSchoolName屬性值的項目。

2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}