翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
ページ分割された結果を処理する: スキャンとクエリ
DynamoDB Enhanced Client API のscan
、query
、および batch
メソッドは、1 つ以上のページを含むレスポンスを返します。ページには、1 つ以上のアイテムが含まれます。コードはページごとにレスポンスを処理することも、個々のアイテムを処理することもできます。
同期DynamoDbEnhancedClient
クライアントによって返されるページ分割レスポンスはPageIterableDynamoDbEnhancedAsyncClient
を返します。
このセクションでは、ページ分割された結果の処理について説明し、スキャン API とクエリ API を使用する例を紹介します。
テーブルのスキャン
SDK scan
まず、同期マッピングクラス の scan
メソッドを調べて、PageIterable
インターフェイスを調べますDynamoDbTable
同期 API を使用する
次の例は、式scan
メソッドを示しています。ProductCatalog は、前に表示したモデルオブジェクトです。
コメント行 2 の後に表示されるフィルタリング式は、返されるProductCatalog
項目を、料金値が 8.00~80.00 の項目に制限します。
この例では、コメント行 1 の後に示されている attributesToProject
メソッドを使用してisbn
値を除外します。
コメント行 3 の後、 PageIterable
オブジェクト pagedResults
は scan
メソッドによって返されます。PageIterable
の stream
メソッドは、ページの処理に使用できる java.util.Stream
コメント行 4 から始まる例では、ProductCatalog
アイテムへのアクセス方法が 2 種類示されています。コメント行 4a の後のバージョンは、各ページをストリーミングし、各ページの項目をソートしてログに記録します。コメント行 4b の後のバージョンは、ページの反復をスキップし、項目に直接アクセスします。
PageIterable
インターフェースには、java.lang.Iterable
SdkIterable
Iterable
は forEach
、iterator
、spliterator
メソッドを、SdkIterable
は stream
メソッドをもたらします。
public static void scanSync(DynamoDbTable<ProductCatalog> productCatalog) { Map<String, AttributeValue> expressionValues = Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(80.00)); ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) // 1. the 'attributesToProject()' method allows you to specify which values you want returned. .attributesToProject("id", "title", "authors", "price") // 2. Filter expression limits the items returned that match the provided criteria. .filterExpression(Expression.builder() .expression("price >= :min_value AND price <= :max_value") .expressionValues(expressionValues) .build()) .build(); // 3. A PageIterable object is returned by the scan method. PageIterable<ProductCatalog> pagedResults = productCatalog.scan(request); logger.info("page count: {}", pagedResults.stream().count()); // 4. Log the returned ProductCatalog items using two variations. // 4a. This version sorts and logs the items of each page. pagedResults.stream().forEach(p -> p.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) )); // 4b. This version sorts and logs all items for all pages. pagedResults.items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach( item -> logger.info(item.toString()) ); }
非同期 API を使用する
非同期 scan
メソッドは結果を PagePublisher
オブジェクトとして返します。PagePublisher
インターフェースには、レスポンスページの処理に使用できる subscribe
メソッドが 2 つあります。subscribe
メソッドの一つは、org.reactivestreams.Publisher
親インターフェースからのものです。この最初のオプションを使用してページを処理するには、subscribe
メソッドに Subscriber
インスタンスを渡します。次の最初の例は、subscribe
メソッドの使用例を示しています。
2 番目のsubscribe
方法は、 SdkPublishersubscribe
は、Subscriber
ではなく Consumer
subscribe
メソッドのバリエーションは、次の 2 番目の例に示されています。
次の例は、前の例で示されたのと同じフィルタ式を使用する非同期バージョンの scan
メソッドです。
コメント行 3 の後、DynamoDbAsyncTable.scan
は PagePublisher
オブジェクトを返します。次の行では、コードによって org.reactivestreams.Subscriber
インターフェース、ProductCatalogSubscriber
のインスタンスが作成され、コメント行 4 の後に PagePublisher
がサブスクライブされます。
Subscriber
オブジェクトは、ProductCatalogSubscriber
クラスの例のコメント行 8 の後に、onNext
メソッドの各ページから ProductCatalog
アイテムを収集します。アイテムはプライベート変数 List
に保存され、ProductCatalogSubscriber.getSubscribedItems()
メソッドを使用して呼び出し元のコードからアクセスされます。これはコメント行 5 の後に呼び出されます。
リストが取得されると、コードはすべての ProductCatalog
アイテムを価格順に並べ替え、各アイテムをログに記録します。
CountDownLatchProductCatalogSubscriber
クラスの は、コメント行 5 の後に続行する前に、すべての項目がリストに追加されるまで呼び出し元のスレッドをブロックします。
public static void scanAsync(DynamoDbAsyncTable productCatalog) { ScanEnhancedRequest request = ScanEnhancedRequest.builder() .consistentRead(true) .attributesToProject("id", "title", "authors", "price") .filterExpression(Expression.builder() // 1. :min_value and :max_value are placeholders for the values provided by the map .expression("price >= :min_value AND price <= :max_value") // 2. Two values are needed for the expression and each is supplied as a map entry. .expressionValues( Map.of( ":min_value", numberValue(8.00), ":max_value", numberValue(400_000.00))) .build()) .build(); // 3. A PagePublisher object is returned by the scan method. PagePublisher<ProductCatalog> pagePublisher = productCatalog.scan(request); ProductCatalogSubscriber subscriber = new ProductCatalogSubscriber(); // 4. Subscribe the ProductCatalogSubscriber to the PagePublisher. pagePublisher.subscribe(subscriber); // 5. Retrieve all collected ProductCatalog items accumulated by the subscriber. subscriber.getSubscribedItems().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString())); // 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. // 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing. }
private static class ProductCatalogSubscriber implements Subscriber<Page<ProductCatalog>> { private CountDownLatch latch = new CountDownLatch(1); private Subscription subscription; private List<ProductCatalog> itemsFromAllPages = new ArrayList<>(); @Override public void onSubscribe(Subscription sub) { subscription = sub; subscription.request(1L); try { latch.await(); // Called by main thread blocking it until latch is released. } catch (InterruptedException e) { throw new RuntimeException(e); } } @Override public void onNext(Page<ProductCatalog> productCatalogPage) { // 8. Collect all the ProductCatalog instances in the page, then ask the publisher for one more page. itemsFromAllPages.addAll(productCatalogPage.items()); subscription.request(1L); } @Override public void onError(Throwable throwable) { } @Override public void onComplete() { latch.countDown(); // Call by subscription thread; latch releases. } List<ProductCatalog> getSubscribedItems() { return this.itemsFromAllPages; } }
次のスニペット例では、コメント行 6 以降に Consumer
を受け入れる PagePublisher.subscribe
メソッドのバージョンを使用しています。Java ラムダパラメータはページを消費し、各アイテムをさらに処理します。この例では、各ページが処理され、各ページのアイテムがソートされてからログに記録されます。
// 6. Use a Consumer to work through each page. pagePublisher.subscribe(page -> page .items().stream() .sorted(Comparator.comparing(ProductCatalog::price)) .forEach(item -> logger.info(item.toString()))) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
PagePublisher
の items
メソッドはモデルインスタンスをアンラップして、コードでアイテムを直接処理できるようにします。このアプローチは次のスニペットに示されています。
// 7. Use a Consumer to work through each ProductCatalog item. pagePublisher.items() .subscribe(product -> logger.info(product.toString())) .exceptionally(failure -> { logger.error("ERROR - ", failure); return null; }) .join(); // If needed, blocks the subscribe() method thread until it is finished processing.
テーブルに対してクエリを実行する
DynamoDbTable
クラスの query()
@DynamoDbPartitionKey
注釈とオプションの @DynamoDbSortKey
注釈を使用して、データクラスのプライマリキーを定義します。
query()
メソッドには、指定された値と一致するアイテムを見つけるためのパーティションキー値が必要です。テーブルにソートキーも定義されている場合は、結果を微調整するための追加比較条件として、その値をクエリに追加することができます。
結果の処理以外は、同期バージョンも非同期バージョンも query()
の動作は同じです。scan
API と同様に、query
API は同期呼び出しの場合は PageIterable
を返し、非同期呼び出しの場合は PagePublisher
を返します。PageIterable
と PagePublisher
の使用方法については、前回のスキャンセクションで説明しました。
Query
メソッドの例
以下の query()
メソッドコード例では MovieActor
クラスを使用しています。データクラスは、パーティションキーの movie
属性とソートキーの actor
属性で構成される複合プライマリキーを定義します。
このクラスは、acting_award_year
という名前のグローバルセカンダリインデックスを使用していることも通知します。インデックスの複合プライマリキーは、パーティションキーの actingaward
属性とソートキーの actingyear
属性で構成されます。このトピックの後半で、インデックスの作成方法と使用方法を示すときには、acting_award_year
インデックスを参照します。
package org.example.tests.model; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbAttribute; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbBean; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondaryPartitionKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSecondarySortKey; import software.amazon.awssdk.enhanced.dynamodb.mapper.annotations.DynamoDbSortKey; import java.util.Objects; @DynamoDbBean public class MovieActor implements Comparable<MovieActor> { private String movieName; private String actorName; private String actingAward; private Integer actingYear; private String actingSchoolName; @DynamoDbPartitionKey @DynamoDbAttribute("movie") public String getMovieName() { return movieName; } public void setMovieName(String movieName) { this.movieName = movieName; } @DynamoDbSortKey @DynamoDbAttribute("actor") public String getActorName() { return actorName; } public void setActorName(String actorName) { this.actorName = actorName; } @DynamoDbSecondaryPartitionKey(indexNames = "acting_award_year") @DynamoDbAttribute("actingaward") public String getActingAward() { return actingAward; } public void setActingAward(String actingAward) { this.actingAward = actingAward; } @DynamoDbSecondarySortKey(indexNames = {"acting_award_year", "movie_year"}) @DynamoDbAttribute("actingyear") public Integer getActingYear() { return actingYear; } public void setActingYear(Integer actingYear) { this.actingYear = actingYear; } @DynamoDbAttribute("actingschoolname") public String getActingSchoolName() { return actingSchoolName; } public void setActingSchoolName(String actingSchoolName) { this.actingSchoolName = actingSchoolName; } @Override public String toString() { final StringBuffer sb = new StringBuffer("MovieActor{"); sb.append("movieName='").append(movieName).append('\''); sb.append(", actorName='").append(actorName).append('\''); sb.append(", actingAward='").append(actingAward).append('\''); sb.append(", actingYear=").append(actingYear); sb.append(", actingSchoolName='").append(actingSchoolName).append('\''); sb.append('}'); return sb.toString(); } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; MovieActor that = (MovieActor) o; return Objects.equals(movieName, that.movieName) && Objects.equals(actorName, that.actorName) && Objects.equals(actingAward, that.actingAward) && Objects.equals(actingYear, that.actingYear) && Objects.equals(actingSchoolName, that.actingSchoolName); } @Override public int hashCode() { return Objects.hash(movieName, actorName, actingAward, actingYear, actingSchoolName); } @Override public int compareTo(MovieActor o) { if (this.movieName.compareTo(o.movieName) != 0){ return this.movieName.compareTo(o.movieName); } else { return this.actorName.compareTo(o.actorName); } } }
以下のコード例では、以下の項目に対してクエリを実行します。
MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie02', actorName='actor0', actingAward='actingaward0', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor1', actingAward='actingaward1', actingYear=2002, actingSchoolName='actingschool1'} MovieActor{movieName='movie02', actorName='actor2', actingAward='actingaward2', actingYear=2002, actingSchoolName='actingschool2'} MovieActor{movieName='movie02', actorName='actor3', actingAward='actingaward3', actingYear=2002, actingSchoolName='null'} MovieActor{movieName='movie02', actorName='actor4', actingAward='actingaward4', actingYear=2002, actingSchoolName='actingschool4'} MovieActor{movieName='movie03', actorName='actor0', actingAward='actingaward0', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor1', actingAward='actingaward1', actingYear=2003, actingSchoolName='actingschool1'} MovieActor{movieName='movie03', actorName='actor2', actingAward='actingaward2', actingYear=2003, actingSchoolName='actingschool2'} MovieActor{movieName='movie03', actorName='actor3', actingAward='actingaward3', actingYear=2003, actingSchoolName='null'} MovieActor{movieName='movie03', actorName='actor4', actingAward='actingaward4', actingYear=2003, actingSchoolName='actingschool4'}
次のコードでは、2 つのQueryConditionalQueryConditionals
使用するかのいずれかのキー値を使用し、DynamoDB サービス API のキー条件式に対応します。この例では、1 行目のコメントの後、パーティション値が movie01
の項目と一致するkeyEqual
インスタンスを定義しています。
この例では、コメント行 2 の後に actingschoolname
が何もない項目を除外するフィルター式も定義しています。
コメント行 3 の後、コードが DynamoDbTable.query()
メソッドに渡すQueryEnhancedRequest
public static void query(DynamoDbTable movieActorTable) { // 1. Define a QueryConditional instance to return items matching a partition value. QueryConditional keyEqual = QueryConditional.keyEqualTo(b -> b.partitionValue("movie01")); // 1a. Define a QueryConditional that adds a sort key criteria to the partition value criteria. QueryConditional sortGreaterThanOrEqualTo = QueryConditional.sortGreaterThanOrEqualTo(b -> b.partitionValue("movie01").sortValue("actor2")); // 2. Define a filter expression that filters out items whose attribute value is null. final Expression filterOutNoActingschoolname = Expression.builder().expression("attribute_exists(actingschoolname)").build(); // 3. Build the query request. QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(keyEqual) .filterExpression(filterOutNoActingschoolname) .build(); // 4. Perform the query. PageIterable<MovieActor> pagedResults = movieActorTable.query(tableQuery); logger.info("page count: {}", pagedResults.stream().count()); // Log number of pages. pagedResults.items().stream() .sorted() .forEach( item -> logger.info(item.toString()) // Log the sorted list of items. );
以下は、メソッドを実行したときの出力です。出力には movie01 の movieName
値を持つ項目が表示され、null
と等しい actingSchoolName
の項目は表示されません。
2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:11:05 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}
コメント行 3 の後に示した次のクエリリクエストのバリエーションでは、コードは keyEqual
QueryConditional
をコメント行 1a の後に定義された sortGreaterThanOrEqualTo
QueryConditional
に置き換えます。次のコードではフィルター式も削除されています。
QueryEnhancedRequest tableQuery = QueryEnhancedRequest.builder() .queryConditional(sortGreaterThanOrEqualTo)
このテーブルには複合プライマリキーがあるため、すべての QueryConditional
インスタンスにパーティションキー値が必要です。sort...
で始まる QueryConditional
メソッドは、ソートキーが必要であることを示します。結果はソートされません。
このクエリでは次の出力が生成されます。このクエリは、movie01 と等しい movieName
値を持つアイテムを返し、actor2 以上の actorName
値のアイテムのみを返します。フィルターが削除されたため、このクエリでは actingSchoolName
属性に値がないアイテムが返されます。
2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:46 - page count: 1 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} 2023-03-05 13:15:00 [main] INFO org.example.tests.QueryDemo:51 - MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}