ページ分割された結果を処理する: スキャンとクエリ - AWS SDK for Java 2.x

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

ページ分割された結果を処理する: スキャンとクエリ

DynamoDB Enhanced Client API のscanquery、および batch メソッドは、1 つ以上のページを含むレスポンスを返します。ページには、1 つ以上のアイテムが含まれます。コードはページごとにレスポンスを処理することも、個々のアイテムを処理することもできます。

同期 DynamoDbEnhancedClient クライアントから返されるページ分割されたレスポンスは PageIterable オブジェクトを返し、非同期 DynamoDbEnhancedAsyncClient から返されるレスポンスは PagePublisher オブジェクトを返します。

このセクションでは、ページ分割された結果の処理について説明し、スキャン API とクエリ API を使用する例を紹介します。

テーブルのスキャン

SDK scan のメソッドは、同じ名前の DynamoDB オペレーションに対応しています。DynamoDB Enhanced Client API にも同じオプションがありますが、使い慣れたオブジェクトモデルを使用してページ分割を処理します。

まず、同期マッピングクラス DynamoDbTablescan メソッドを見て、PageIterable インターフェイスを調べます。

同期 API を使用する

次の例は、を使用して返されるアイテムをフィルタリングする scan メソッドを示しています。ProductCatalog は、前に示したモデルオブジェクトです。

コメント行 2 の後に表示されるフィルタリング式は、返されるProductCatalog項目を 8.00~80.00 の料金値を持つ項目に制限します。

この例では、コメント行 1 の後に示されている attributesToProjectメソッドを使用してisbn値を除外します。

コメント行 3 の後、 PageIterable オブジェクト pagedResultsscanメソッドによって返されます。PageIterablestream メソッドは、ページの処理に使用できる java.util.Stream オブジェクトを返します。この例では、ページ数がカウントされ、ログに記録されます。

コメント行 4 から始まる例では、ProductCatalog アイテムへのアクセス方法が 2 種類示されています。コメント行 4a 以降のバージョンは、各ページをストリーミングし、各ページの項目をソートしてログに記録します。コメント行 4b 以降のバージョンは、ページの反復をスキップし、項目に直接アクセスします。

PageIterable インターフェースには、java.lang.IterableSdkIterable という 2 つの親インターフェースがあるため、結果を処理する方法が複数あります。IterableforEachiteratorspliterator メソッドを、SdkIterablestream メソッドをもたらします。

public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }

非同期 API を使用する

非同期 scan メソッドは結果を PagePublisher オブジェクトとして返します。PagePublisher インターフェースには、レスポンスページの処理に使用できる subscribe メソッドが 2 つあります。subscribe メソッドの一つは、org.reactivestreams.Publisher 親インターフェースからのものです。この最初のオプションを使用してページを処理するには、subscribe メソッドに Subscriber インスタンスを渡します。次の最初の例は、subscribe メソッドの使用例を示しています。

2 つ目の subscribe メソッドは、SdkPublisher インターフェイスからのものです。このバージョンの subscribe は、Subscriber ではなく Consumer を受け入れます。この subscribe メソッドのバリエーションは、次の 2 番目の例に示されています。

次の例は、前の例で示されたのと同じフィルタ式を使用する非同期バージョンの scan メソッドです。

コメント行 3 の後、DynamoDbAsyncTable.scanPagePublisher オブジェクトを返します。次の行では、コードによって org.reactivestreams.Subscriber インターフェース、ProductCatalogSubscriber のインスタンスが作成され、コメント行 4 の後に PagePublisher がサブスクライブされます。

Subscriber オブジェクトは、ProductCatalogSubscriber クラスの例のコメント行 8 の後に、onNext メソッドの各ページから ProductCatalog アイテムを収集します。アイテムはプライベート変数 List に保存され、ProductCatalogSubscriber.getSubscribedItems() メソッドを使用して呼び出し元のコードからアクセスされます。これはコメント行 5 の後に呼び出されます。

リストが取得されると、コードはすべての ProductCatalog アイテムを価格順に並べ替え、各アイテムをログに記録します。

ProductCatalogSubscriber クラスの CountDownLatch は、コメント行 5 以降を継続する前に、すべてのアイテムがリストに追加されるまで、呼び出し元のスレッドをブロックします。

public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }

次のスニペット例では、コメント行 6 以降に Consumer を受け入れる PagePublisher.subscribe メソッドのバージョンを使用しています。Java ラムダパラメータはページを消費し、各アイテムをさらに処理します。この例では、各ページが処理され、各ページのアイテムがソートされてからログに記録されます。

// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

PagePublisheritems メソッドはモデルインスタンスをアンラップして、コードでアイテムを直接処理できるようにします。このアプローチは次のスニペットに示されています。

// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.

テーブルに対してクエリを実行する

DynamoDB 拡張クライアントを使用してテーブルをクエリし、特定の条件に一致する複数の項目を取得できます。query() メソッドは、 データクラスで定義された @DynamoDbPartitionKeyとオプションの@DynamoDbSortKey注釈を使用して、プライマリキー値に基づいて項目を検索します。

query() メソッドにはパーティションキー値が必要で、オプションでソートキー条件を受け入れて結果をさらに絞り込みます。scan API と同様に、クエリは同期呼び出しPageIterableの場合は を返し、非同期呼び出しPagePublisherの場合は を返します。

Query メソッドの例

以下の query() メソッドコード例では MovieActor クラスを使用しています。データクラスは、 movie 属性をパーティションキーとして、 actor 属性をソートキーとして構成する複合プライマリキーを定義します。

package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }

以下のコード例では、以下の項目に対してクエリを実行します。

MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}

次のコードは、 keyEqual (コメント行 1 の後) と sortGreaterThanOrEqualTo (コメント行 1a の後) の 2 つのQueryConditionalインスタンスを定義します。

パーティションキーで項目をクエリする

keyEqual インスタンスは、パーティションキー値が の項目に一致しますmovie01

この例では、actingschoolname値を持たない項目を除外するフィルター式をコメント行 2 の後に定義します。

は、クエリのキー条件とフィルター式QueryEnhancedRequestを組み合わせます。

public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query using the "keyEqual" conditional and filter expression. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );
例 – 条件付きkeyEqualクエリを使用した出力

以下は、メソッドを実行したときの出力です。出力には movie01 movieName 値を持つ項目が表示され、null と等しい actingSchoolName の項目は表示されません。

2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}

パーティションキーとソートキーで項目をクエリする

は、actor2 以上の値のソートキー条件を追加して、パーティションキーの一致 (movie01) をsortGreaterThanOrEqualToQueryConditional絞り込みます。

で始まる QueryConditionalメソッドでは、ソートキー値に基づいて比較してクエリを一致およびさらに絞り込むためにパーティションキー値sortが必要です。 Sortメソッド名の は、結果がソートされることを意味するのではなく、ソートキー値が比較に使用されます。

次のスニペットでは、コメント行 3 の後に前に示したクエリリクエストを変更します。このスニペットは、keyEqual」クエリ条件を、コメント行 1a の後に定義されたsortGreaterThanOrEqualTo」クエリ条件に置き換えます。次のコードではフィルター式も削除されています。

QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo).build();
例 – 条件付きsortGreaterThanOrEqualToクエリを使用した出力

このクエリでは次の出力が生成されます。このクエリは、movie01 と等しい movieName 値を持つアイテムを返し、actor2 以上の actorName 値のアイテムのみを返します。フィルターを削除するため、クエリはactingSchoolName属性の値がない項目を返します。

2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}