テーブルおよびインデックスにスキャン: Java - Amazon DynamoDB

テーブルおよびインデックスにスキャン: Java

Scan オペレーションは、Amazon DynamoDB のテーブルまたはインデックス内のすべての項目を読み込みます。

次に、AWS SDK for Java ドキュメント API を使用してテーブルをスキャンするステップを示します。

  1. AmazonDynamoDB クラスのインスタンスを作成します。

  2. ScanRequest クラスのインスタンスを作成して、スキャンパラメータを指定します。

    必須パラメータは、テーブル名のみです。

  3. scan メソッドを実行し、前述のステップで作成した ScanRequest オブジェクトを指定します。

以下の Reply テーブルは、フォーラムスレッドの返信を保存します。

Reply ( Id, ReplyDateTime, Message, PostedBy )

テーブルには、さまざまなフォーラムスレッドに対するすべての返信が保持されます。したがってプライマリキーは、Id (パーティションキー) と ReplyDateTime (ソートキー) の両方で構成されます。以下の Java コード例は、テーブル全体をスキャンします。ScanRequest インスタンスは、スキャンするテーブルの名前を指定します。

AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); ScanRequest scanRequest = new ScanRequest() .withTableName("Reply"); ScanResult result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); }

オプションパラメータの指定

scan メソッドでは、複数のオプションパラメータがサポートされています。たとえば、オプションでフィルター式を使用して、スキャン結果をフィルタリングできます。フィルター式では、条件を評価する条件および属性の名前と値を指定できます。詳細については、スキャンを参照してください。

以下の Java サンプルは、ProductCatalog テーブルをスキャンして、料金が 0 未満の項目を検索します。この例では、次のオプションパラメータが指定されています。

  • 料金が 0 (エラー状態) 未満の項目のみを取得するフィルター式。

  • クエリ結果内の項目について取得する属性のリスト。

Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>(); expressionAttributeValues.put(":val", new AttributeValue().withN("0")); ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withFilterExpression("Price < :val") .withProjectionExpression("Id") .withExpressionAttributeValues(expressionAttributeValues); ScanResult result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()) { printItem(item); }

また、スキャンリクエストの withLimit メソッドを使用して、ページのサイズ、またはページあたりの項目数を制限することもできます。scan メソッドを実行するたびに、指定された数の項目を含む結果が 1 ページ取得されます。次のページを取得するには、前ページの最後の項目のプライマリキーバリューを入力して、scan メソッドを再度実行します。そうすることで、scan メソッドが次の項目のセットを返すことができます。この情報は、withExclusiveStartKey メソッドを使用することでリクエストに含めます。最初は、このメソッドのパラメータを null にすることができます。以降のページを取り出すには、このプロパティ値を更新して、前ページの最後の項目のプライマリキーにする必要があります。

以下の Java コード例は、ProductCatalog テーブルをスキャンします。リクエストでは、withLimit および withExclusiveStartKey メソッドが使用されます。do/while ループは、結果の getLastEvaluatedKey メソッドから null 値が返されるまで、一度に 1 ページのスキャンを継続します。

Map<String, AttributeValue> lastKeyEvaluated = null; do { ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withLimit(10) .withExclusiveStartKey(lastKeyEvaluated); ScanResult result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); } lastKeyEvaluated = result.getLastEvaluatedKey(); } while (lastKeyEvaluated != null);

例 - Java を使用したスキャン

以下の Java コード例は、動作サンプルを提供し、ProductCatalog テーブルをスキャンして、料金が 100 未満の項目を検索します。

注記

SDK for Java には、オブジェクト永続性モデルも用意されています。このモデルにより、クライアント側のクラスを DynamoDB テーブルにマッピングできます。この方法により、記述する必要のあるコードの量を減らすことができます。詳細については、「 」を参照してくださいJava: DynamoDBMapper

注記

このコード例では、アカウントの DynamoDB に対し、DynamoDB でのコード例用のテーブルの作成とデータのロード セクションの手順に従ってデータが既にロードされていることを前提としています。

以下の例を実行するための詳しい手順については、「Java コードの例」を参照してください。

/** * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * This file is licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. A copy of * the License is located at * * http://aws.amazon.com/apache2.0/ * * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR * CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package com.amazonaws.codesamples.document; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; public class DocumentAPIScan { static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); static String tableName = "ProductCatalog"; public static void main(String[] args) throws Exception { findProductsForPriceLessThanOneHundred(); } private static void findProductsForPriceLessThanOneHundred() { Table table = dynamoDB.getTable(tableName); Map<String, Object> expressionAttributeValues = new HashMap<String, Object>(); expressionAttributeValues.put(":pr", 100); ItemCollection<ScanOutcome> items = table.scan("Price < :pr", // FilterExpression "Id, Title, ProductCategory, Price", // ProjectionExpression null, // ExpressionAttributeNames - not used in this example expressionAttributeValues); System.out.println("Scan of " + tableName + " for items with a price less than 100."); Iterator<Item> iterator = items.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next().toJSONPretty()); } } }

例 - Java を使用した並列スキャン

次の Java コードの例は並列スキャン方法を示します。プログラムは、ParallelScanTest という名前のテーブルを削除してから再作成します。その後、テーブルにデータをロードします。データのロードが完了すると、プログラムは複数のスレッドを生成し、Scan リクエストを並列に発行します。プログラムは、各並列リクエストのランタイム統計を出力します。

注記

SDK for Java には、オブジェクト永続性モデルも用意されています。このモデルにより、クライアント側のクラスを DynamoDB テーブルにマッピングできます。この方法により、記述する必要のあるコードの量を減らすことができます。詳細については、「 」を参照してくださいJava: DynamoDBMapper

注記

このコード例では、アカウントの DynamoDB に対し、DynamoDB でのコード例用のテーブルの作成とデータのロード セクションの手順に従ってデータが既にロードされていることを前提としています。

以下の例を実行するための詳しい手順については、「Java コードの例」を参照してください。

/** * Copyright 2010-2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. * * This file is licensed under the Apache License, Version 2.0 (the "License"). * You may not use this file except in compliance with the License. A copy of * the License is located at * * http://aws.amazon.com/apache2.0/ * * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR * CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ package com.amazonaws.codesamples.document; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; public class DocumentAPIParallelScan { // total number of sample items static int scanItemCount = 300; // number of items each scan request should return static int scanItemLimit = 10; // number of logical segments for parallel scan static int parallelScanThreads = 16; // table that will be used for scanning static String parallelScanTestTableName = "ParallelScanTest"; static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); public static void main(String[] args) throws Exception { try { // Clean up the table deleteTable(parallelScanTestTableName); createTable(parallelScanTestTableName, 10L, 5L, "Id", "N"); // Upload sample data for scan uploadSampleProducts(parallelScanTestTableName, scanItemCount); // Scan the table using multiple threads parallelScan(parallelScanTestTableName, scanItemLimit, parallelScanThreads); } catch (AmazonServiceException ase) { System.err.println(ase.getMessage()); } } private static void parallelScan(String tableName, int itemLimit, int numberOfThreads) { System.out.println( "Scanning " + tableName + " using " + numberOfThreads + " threads " + itemLimit + " items at a time"); ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); // Divide DynamoDB table into logical segments // Create one task for scanning each segment // Each thread will be scanning one segment int totalSegments = numberOfThreads; for (int segment = 0; segment < totalSegments; segment++) { // Runnable task that will only scan one segment ScanSegmentTask task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment); // Execute the task executor.execute(task); } shutDownExecutorService(executor); } // Runnable task for scanning a single segment of a DynamoDB table private static class ScanSegmentTask implements Runnable { // DynamoDB table to scan private String tableName; // number of items each scan request should return private int itemLimit; // Total number of segments // Equals to total number of threads scanning the table in parallel private int totalSegments; // Segment that will be scanned with by this task private int segment; public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment) { this.tableName = tableName; this.itemLimit = itemLimit; this.totalSegments = totalSegments; this.segment = segment; } @Override public void run() { System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time..."); int totalScannedItemCount = 0; Table table = dynamoDB.getTable(tableName); try { ScanSpec spec = new ScanSpec().withMaxResultSize(itemLimit).withTotalSegments(totalSegments) .withSegment(segment); ItemCollection<ScanOutcome> items = table.scan(spec); Iterator<Item> iterator = items.iterator(); Item currentItem = null; while (iterator.hasNext()) { totalScannedItemCount++; currentItem = iterator.next(); System.out.println(currentItem.toString()); } } catch (Exception e) { System.err.println(e.getMessage()); } finally { System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName); } } } private static void uploadSampleProducts(String tableName, int itemCount) { System.out.println("Adding " + itemCount + " sample items to " + tableName); for (int productIndex = 0; productIndex < itemCount; productIndex++) { uploadProduct(tableName, productIndex); } } private static void uploadProduct(String tableName, int productIndex) { Table table = dynamoDB.getTable(tableName); try { System.out.println("Processing record #" + productIndex); Item item = new Item().withPrimaryKey("Id", productIndex) .withString("Title", "Book " + productIndex + " Title").withString("ISBN", "111-1111111111") .withStringSet("Authors", new HashSet<String>(Arrays.asList("Author1"))).withNumber("Price", 2) .withString("Dimensions", "8.5 x 11.0 x 0.5").withNumber("PageCount", 500) .withBoolean("InPublication", true).withString("ProductCategory", "Book"); table.putItem(item); } catch (Exception e) { System.err.println("Failed to create item " + productIndex + " in " + tableName); System.err.println(e.getMessage()); } } private static void deleteTable(String tableName) { try { Table table = dynamoDB.getTable(tableName); table.delete(); System.out.println("Waiting for " + tableName + " to be deleted...this may take a while..."); table.waitForDelete(); } catch (Exception e) { System.err.println("Failed to delete table " + tableName); e.printStackTrace(System.err); } } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType) { createTable(tableName, readCapacityUnits, writeCapacityUnits, partitionKeyName, partitionKeyType, null, null); } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType, String sortKeyName, String sortKeyType) { try { System.out.println("Creating table " + tableName); List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName(partitionKeyName).withKeyType(KeyType.HASH)); // Partition // key List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions .add(new AttributeDefinition().withAttributeName(partitionKeyName).withAttributeType(partitionKeyType)); if (sortKeyName != null) { keySchema.add(new KeySchemaElement().withAttributeName(sortKeyName).withKeyType(KeyType.RANGE)); // Sort // key attributeDefinitions .add(new AttributeDefinition().withAttributeName(sortKeyName).withAttributeType(sortKeyType)); } Table table = dynamoDB.createTable(tableName, keySchema, attributeDefinitions, new ProvisionedThroughput() .withReadCapacityUnits(readCapacityUnits).withWriteCapacityUnits(writeCapacityUnits)); System.out.println("Waiting for " + tableName + " to be created...this may take a while..."); table.waitForActive(); } catch (Exception e) { System.err.println("Failed to create table " + tableName); e.printStackTrace(System.err); } } private static void shutDownExecutorService(ExecutorService executor) { executor.shutdown(); try { if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } }