Work with paginated results: scans and queries - AWS SDK for Java 2.x

Work with paginated results: scans and queries

The scan, query and batch methods of the DynamoDB Enhanced Client API return responses with one or more pages. A page contains one or more items. Your code can process the response on per-page basis or it can process individual items.

A paginated response returned by the synchronous DynamoDbEnhancedClient client returns a PageIterable object, whereas a response returned by the asynchronous DynamoDbEnhancedAsyncClient returns a PagePublisher object.

This section looks at processing paginated results and provides examples that use the scan and query APIs.

Scan a table

The SDK's scan method corresponds to the DynamoDB operation of the same name. The DynamoDB Enhanced Client API offers the same options but it uses a familiar object model and handles the pagination for you.

First, we explore the PageIterable interface by looking at the scan method of the synchronous mapping class, DynamoDbTable.

Use the synchronous API

The following example shows the scan method that uses an expression to filter the items that are returned. The ProductCatalog is the model object that was shown earlier.

The filtering expression shown after comment line 2 limits the ProductCatalog items that are returned to those with a price value between 8.00 and 80.00 inclusively.

This example also excludes the isbn values by using the attributesToProject method shown after comment line 1.

After comment line 3, the PageIterable object, pagedResults, is returned by the scan method. The stream method of PageIterable returns a java.util.Stream object, which you can use to process the pages. In this example, the number of pages is counted and logged.

Starting with comment line 4, the example shows two variations of accessing the ProductCatalog items. The version after comment line 4a streams through each page and sorts and logs the items on each page. The version after comment line 4b skips the page iteration and accesses the items directly.

The PageIterable interface offers multiple ways to process results because of its two parent interfaces—java.lang.Iterable and SdkIterable. Iterable brings the forEach, iterator and spliterator methods, and SdkIterable brings the stream method.

public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }

Use the asynchronous API

The asynchronous scan method returns results as a PagePublisher object. The PagePublisher interface has two subscribe methods that you can use to process response pages. One subscribe method comes from the org.reactivestreams.Publisher parent interface. To process pages using this first option, pass a Subscriber instance to the subscribe method. The first example that follows shows the use of subscribe method.

The second subscribe method comes from the SdkPublisher interface. This version of subscribe accepts a Consumer rather than a Subscriber. This subscribe method variation is shown in the second example that follows.

The following example shows the asynchronous version of the scan method that uses the same filter expression shown in the previous example.

After comment line 3, DynamoDbAsyncTable.scan returns a PagePublisher object. On the next line, the code creates an instance of the org.reactivestreams.Subscriber interface, ProductCatalogSubscriber, which subscribes to the PagePublisher after comment line 4.

The Subscriber object collects the ProductCatalog items from each page in the onNext method after comment line 8 in the ProductCatalogSubscriber class example. The items are stored in the private List variable and are accessed in the calling code with the ProductCatalogSubscriber.getSubscribedItems() method. This is called after comment line 5.

After the list is retrieved, the code sorts all ProductCatalog items by price and logs each item.

The CountDownLatch in the ProductCatalogSubscriber class blocks the calling thread until all items have been added to the list before continuing after comment line 5.

public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }

The following snippet example uses the version of the PagePublisher.subscribe method that accepts a Consumer after comment line 6. The Java lambda parameter consumes pages, which further process each item. In this example, each page is processed and the items on each page are sorted and then logged.

// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

The items method of PagePublisher unwraps the model instances so that your code can process the items directly. This approach is shown in the following snippet.

// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

Query a table

The query() method of the DynamoDbTable class finds items based on primary key values. The @DynamoDbPartitionKey annotation and the optional @DynamoDbSortKey annotation are used to define the primary key on your data class.

The query() method requires a partition key value that finds items that match the supplied value. If your table also defines a sort key, you can add a value for it to your query as an additional comparison condition to fine tune the results.

Except for processing the results, the synchronous and asynchronous versions of query() work the same. As with the scan API, the query API returns a PageIterable for a synchronous call and a PagePublisher for asynchronous call. We discussed the use of PageIterable and PagePublisher previously in the scan section.

Query method examples

The query() method code example that follows uses the MovieActor class. The data class defines a composite primary key that is made up of the movie attribute for the partition key and the actor attribute for the sort key.

The class also signals that it uses a global secondary index named acting_award_year. The index's composite primary key is composed of the actingaward attribute for the partition key and the actingyear for the sort key. Later in this topic, when we show how to create and use indexes, we'll refer to the acting_award_year index.

package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }

The code examples that follow query against the following items.

MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}

The following code defines two QueryConditional instances. QueryConditionals work with key values—either the partition key alone or in combination with the sort key—and correspond to the key conditional expressions of the DynamoDB service API. After comment line 1, the example defines the keyEqual instance that matches items with a partition value of movie01.

This example also defines a filter expression that filters off any item that has no actingschoolname on after comment line 2.

After comment line 3, the example shows the QueryEnhancedRequest instance that the code passes to the DynamoDbTable.query() method. This object combines the key condition and filter that the SDK uses to generate the request to the DynamoDB service.

public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );

The following is the output from running the method. The output displays items with a movieName value of movie01 and displays no items with actingSchoolName equal to null.

2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}

In the following query request variation shown previously after comment line 3, the code replaces the keyEqual QueryConditional with the sortGreaterThanOrEqualTo QueryConditional that was defined after comment line 1a. The following code also removes the filter expression.

QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo)

Because this table has a composite primary key, all QueryConditional instances require a partition key value. QueryConditional methods that begin with sort... indicate that a sort key is required. The results are not sorted.

The following output displays the results from the query. The query returns items that have a movieName value equal to movie01 and only items that have an actorName value that is greater than or equal to actor2. Because the filter was removed, the query returns items that have no value for the actingSchoolName attribute.

2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}