테이블 및 인덱스 스캔: Java
Scan
작업은 Amazon DynamoDB에서 테이블 또는 인덱스의 모든 항목을 읽어옵니다.
다음은 AWS SDK for Java Document API를 사용하여 테이블을 스캔하는 단계입니다.
-
AmazonDynamoDB
클래스의 인스턴스를 만듭니다. -
ScanRequest
클래스 인스턴스를 생성하여 스캔 파라미터를 입력합니다.이때는 테이블 이름 파라미터만 있으면 됩니다.
-
scan
메서드를 실행하고 이전 단계에서 생성한ScanRequest
객체를 입력합니다.
다음은 포럼 스레드 댓글을 저장하는 Reply
테이블입니다.
예
Reply ( Id, ReplyDateTime, Message, PostedBy )
이 테이블에는 다양한 포럼 스레드의 댓글이 모두 저장됩니다. 따라서 기본 키도 Id
(파티션 키)와 ReplyDateTime
(정렬 키), 두 가지로 구성됩니다. 다음은 테이블 전체를 스캔하는 Java 코드 예제입니다. ScanRequest
인스턴스는 스캔할 테이블 이름을 지정합니다.
예
AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); ScanRequest scanRequest = new ScanRequest() .withTableName("Reply"); ScanResponse result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); }
옵션 파라미터 지정
scan
메서드는 여러 가지 선택적 파라미터를 지원합니다. 예를 들어 옵션으로 필터 표현식을 사용하여 스캔 결과를 필터링할 수 있습니다. 필터 표현식을 사용할 때는 필터 조건과 평가할 조건의 속성 이름 및 값을 지정합니다. 자세한 내용은 스캔을 참조하세요.
다음 Java 코드는 ProductCatalog
테이블을 스캔하여 가격이 0보다 작은 항목을 찾습니다. 이 예제에서 지정하는 옵션 파라미터는 다음과 같습니다.
-
가격이 0보다 작은 항목만 가져오는 필터 표현식(오류 조건)
-
쿼리 결과로 가져올 항목 속성 목록
예
Map<String, AttributeValue> expressionAttributeValues = new HashMap<String, AttributeValue>(); expressionAttributeValues.put(":val", new AttributeValue().withN("0")); ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withFilterExpression("Price < :val") .withProjectionExpression("Id") .withExpressionAttributeValues(expressionAttributeValues); ScanResponse result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()) { printItem(item); }
또한 옵션으로 스캔 요청에서 withLimit
메서드를 사용하여 페이지 크기 또는 페이지당 항목 수를 제한할 수 있습니다. scan
메서드를 실행할 때마다 지정된 수의 항목이 있는 결과 한 페이지를 가져옵니다. 다음 페이지를 가져오려면 이전 페이지의 마지막 항목의 기본 키 값을 제공하여 scan
메서드가 다음 항목 집합을 반환할 수 있도록 scan
메서드를 다시 실행합니다. withExclusiveStartKey
메서드를 사용하여 이 요청 정보를 입력합니다. 이 메서드의 초기 파라미터 값은 null이 될 수 있습니다. 연이어 다음 페이지까지 가져오려면 이 속성 값을 이전 페이지에서 마지막 항목의 기본 키로 업데이트해야 합니다.
다음은 ProductCatalog
테이블을 스캔하는 Java 코드 예제입니다. withLimit
메서드와 withExclusiveStartKey
메서드가 요청에 사용됩니다. do/while
루프는 결과에서 getLastEvaluatedKey
메서드가 null 값을 반환할 때까지 계속해서 한 번에 한 페이지씩 스캔합니다.
예
Map<String, AttributeValue> lastKeyEvaluated = null; do { ScanRequest scanRequest = new ScanRequest() .withTableName("ProductCatalog") .withLimit(10) .withExclusiveStartKey(lastKeyEvaluated); ScanResponse result = client.scan(scanRequest); for (Map<String, AttributeValue> item : result.getItems()){ printItem(item); } lastKeyEvaluated = result.getLastEvaluatedKey(); } while (lastKeyEvaluated != null);
예 - Java를 사용한 스캔
다음 Java 코드 예제는 ProductCatalog
테이블을 스캔하여 가격이 100보다 작은 항목을 찾는 작업 샘플을 제공합니다.
참고
또한 SDK for Java에서는 객체 지속성 모델을 제공하므로 DynamoDB 테이블로 클라이언트 측 클래스를 매핑할 수 있습니다. 이러한 접근 방식을 활용하면 작성해야 할 코드가 줄어듭니다. 자세한 내용은 Java 1.x: DynamoDBMapper 섹션을 참조하세요.
참고
아래 코드 예제는 DynamoDB에서 테이블 생성 및 코드 예시에 대한 데이터 로드 단원의 지침에 따라 이미 계정의 DynamoDB에 데이터를 로드하였다고 가정한 것입니다.
다음 예제를 실행하기 위한 단계별 지침은 Java 코드 예 섹션을 참조하세요.
package com.amazonaws.codesamples.document; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; public class DocumentAPIScan { static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); static String tableName = "ProductCatalog"; public static void main(String[] args) throws Exception { findProductsForPriceLessThanOneHundred(); } private static void findProductsForPriceLessThanOneHundred() { Table table = dynamoDB.getTable(tableName); Map<String, Object> expressionAttributeValues = new HashMap<String, Object>(); expressionAttributeValues.put(":pr", 100); ItemCollection<ScanOutcome> items = table.scan("Price < :pr", // FilterExpression "Id, Title, ProductCategory, Price", // ProjectionExpression null, // ExpressionAttributeNames - not used in this example expressionAttributeValues); System.out.println("Scan of " + tableName + " for items with a price less than 100."); Iterator<Item> iterator = items.iterator(); while (iterator.hasNext()) { System.out.println(iterator.next().toJSONPretty()); } } }
예 - Java를 사용한 병렬 스캔
다음은 병렬 스캔을 설명하는 Java 코드 예제입니다. 프로그램이 ParallelScanTest
라는 이름의 테이블을 삭제했다가 재생성한 후 테이블에 데이터를 로드합니다. 데이터 로드가 끝나면 프로그램이 다수의 스레드를 생성하여 병렬 Scan
요청을 실행합니다. 마지막으로 프로그램은 각 병렬 요청의 런타임 통계를 출력합니다.
참고
또한 SDK for Java에서는 객체 지속성 모델을 제공하므로 DynamoDB 테이블로 클라이언트 측 클래스를 매핑할 수 있습니다. 이러한 접근 방식을 활용하면 작성해야 할 코드가 줄어듭니다. 자세한 내용은 Java 1.x: DynamoDBMapper 섹션을 참조하세요.
참고
아래 코드 예제는 DynamoDB에서 테이블 생성 및 코드 예시에 대한 데이터 로드 단원의 지침에 따라 이미 계정의 DynamoDB에 데이터를 로드하였다고 가정한 것입니다.
다음 예제를 실행하기 위한 단계별 지침은 Java 코드 예 섹션을 참조하세요.
package com.amazonaws.codesamples.document; import java.util.ArrayList; import java.util.Arrays; import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import com.amazonaws.AmazonServiceException; import com.amazonaws.services.dynamodbv2.AmazonDynamoDB; import com.amazonaws.services.dynamodbv2.AmazonDynamoDBClientBuilder; import com.amazonaws.services.dynamodbv2.document.DynamoDB; import com.amazonaws.services.dynamodbv2.document.Item; import com.amazonaws.services.dynamodbv2.document.ItemCollection; import com.amazonaws.services.dynamodbv2.document.ScanOutcome; import com.amazonaws.services.dynamodbv2.document.Table; import com.amazonaws.services.dynamodbv2.document.spec.ScanSpec; import com.amazonaws.services.dynamodbv2.model.AttributeDefinition; import com.amazonaws.services.dynamodbv2.model.KeySchemaElement; import com.amazonaws.services.dynamodbv2.model.KeyType; import com.amazonaws.services.dynamodbv2.model.ProvisionedThroughput; public class DocumentAPIParallelScan { // total number of sample items static int scanItemCount = 300; // number of items each scan request should return static int scanItemLimit = 10; // number of logical segments for parallel scan static int parallelScanThreads = 16; // table that will be used for scanning static String parallelScanTestTableName = "ParallelScanTest"; static AmazonDynamoDB client = AmazonDynamoDBClientBuilder.standard().build(); static DynamoDB dynamoDB = new DynamoDB(client); public static void main(String[] args) throws Exception { try { // Clean up the table deleteTable(parallelScanTestTableName); createTable(parallelScanTestTableName, 10L, 5L, "Id", "N"); // Upload sample data for scan uploadSampleProducts(parallelScanTestTableName, scanItemCount); // Scan the table using multiple threads parallelScan(parallelScanTestTableName, scanItemLimit, parallelScanThreads); } catch (AmazonServiceException ase) { System.err.println(ase.getMessage()); } } private static void parallelScan(String tableName, int itemLimit, int numberOfThreads) { System.out.println( "Scanning " + tableName + " using " + numberOfThreads + " threads " + itemLimit + " items at a time"); ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads); // Divide DynamoDB table into logical segments // Create one task for scanning each segment // Each thread will be scanning one segment int totalSegments = numberOfThreads; for (int segment = 0; segment < totalSegments; segment++) { // Runnable task that will only scan one segment ScanSegmentTask task = new ScanSegmentTask(tableName, itemLimit, totalSegments, segment); // Execute the task executor.execute(task); } shutDownExecutorService(executor); } // Runnable task for scanning a single segment of a DynamoDB table private static class ScanSegmentTask implements Runnable { // DynamoDB table to scan private String tableName; // number of items each scan request should return private int itemLimit; // Total number of segments // Equals to total number of threads scanning the table in parallel private int totalSegments; // Segment that will be scanned with by this task private int segment; public ScanSegmentTask(String tableName, int itemLimit, int totalSegments, int segment) { this.tableName = tableName; this.itemLimit = itemLimit; this.totalSegments = totalSegments; this.segment = segment; } @Override public void run() { System.out.println("Scanning " + tableName + " segment " + segment + " out of " + totalSegments + " segments " + itemLimit + " items at a time..."); int totalScannedItemCount = 0; Table table = dynamoDB.getTable(tableName); try { ScanSpec spec = new ScanSpec().withMaxResultSize(itemLimit).withTotalSegments(totalSegments) .withSegment(segment); ItemCollection<ScanOutcome> items = table.scan(spec); Iterator<Item> iterator = items.iterator(); Item currentItem = null; while (iterator.hasNext()) { totalScannedItemCount++; currentItem = iterator.next(); System.out.println(currentItem.toString()); } } catch (Exception e) { System.err.println(e.getMessage()); } finally { System.out.println("Scanned " + totalScannedItemCount + " items from segment " + segment + " out of " + totalSegments + " of " + tableName); } } } private static void uploadSampleProducts(String tableName, int itemCount) { System.out.println("Adding " + itemCount + " sample items to " + tableName); for (int productIndex = 0; productIndex < itemCount; productIndex++) { uploadProduct(tableName, productIndex); } } private static void uploadProduct(String tableName, int productIndex) { Table table = dynamoDB.getTable(tableName); try { System.out.println("Processing record #" + productIndex); Item item = new Item().withPrimaryKey("Id", productIndex) .withString("Title", "Book " + productIndex + " Title").withString("ISBN", "111-1111111111") .withStringSet("Authors", new HashSet<String>(Arrays.asList("Author1"))).withNumber("Price", 2) .withString("Dimensions", "8.5 x 11.0 x 0.5").withNumber("PageCount", 500) .withBoolean("InPublication", true).withString("ProductCategory", "Book"); table.putItem(item); } catch (Exception e) { System.err.println("Failed to create item " + productIndex + " in " + tableName); System.err.println(e.getMessage()); } } private static void deleteTable(String tableName) { try { Table table = dynamoDB.getTable(tableName); table.delete(); System.out.println("Waiting for " + tableName + " to be deleted...this may take a while..."); table.waitForDelete(); } catch (Exception e) { System.err.println("Failed to delete table " + tableName); e.printStackTrace(System.err); } } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType) { createTable(tableName, readCapacityUnits, writeCapacityUnits, partitionKeyName, partitionKeyType, null, null); } private static void createTable(String tableName, long readCapacityUnits, long writeCapacityUnits, String partitionKeyName, String partitionKeyType, String sortKeyName, String sortKeyType) { try { System.out.println("Creating table " + tableName); List<KeySchemaElement> keySchema = new ArrayList<KeySchemaElement>(); keySchema.add(new KeySchemaElement().withAttributeName(partitionKeyName).withKeyType(KeyType.HASH)); // Partition // key List<AttributeDefinition> attributeDefinitions = new ArrayList<AttributeDefinition>(); attributeDefinitions .add(new AttributeDefinition().withAttributeName(partitionKeyName) .withAttributeType(partitionKeyType)); if (sortKeyName != null) { keySchema.add(new KeySchemaElement().withAttributeName(sortKeyName).withKeyType(KeyType.RANGE)); // Sort // key attributeDefinitions .add(new AttributeDefinition().withAttributeName(sortKeyName).withAttributeType(sortKeyType)); } Table table = dynamoDB.createTable(tableName, keySchema, attributeDefinitions, new ProvisionedThroughput() .withReadCapacityUnits(readCapacityUnits).withWriteCapacityUnits(writeCapacityUnits)); System.out.println("Waiting for " + tableName + " to be created...this may take a while..."); table.waitForActive(); } catch (Exception e) { System.err.println("Failed to create table " + tableName); e.printStackTrace(System.err); } } private static void shutDownExecutorService(ExecutorService executor) { executor.shutdown(); try { if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { executor.shutdownNow(); } } catch (InterruptedException e) { executor.shutdownNow(); // Preserve interrupt status Thread.currentThread().interrupt(); } } }