分批操作 - AWS SDK for Java 2.x

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

分批操作

DynamoDB 增强型客户端 API 提供两种批处理方法,即 batchGetItem()batchWriteItem()。

batchGetItem() 示例

使用该DynamoDbTable.batchGetItem()方法,您可以在一个总体请求中检索多个表中的多达 100 个单个项目。以下示例使用前面显示的CustomerMovieActor数据类。

在第 1 行和第 2 行之后的示例中,您构建了ReadBatch对象,这些对象稍后作为参数添加到batchGetItem()方法的注释第 3 行之后。注释行 1 之后的代码生成要从Customer表中读取的批处理。注释行 1a 之后的代码显示了使用使用主键值来指定要读取的项目的GetItemEnhancedRequest生成器。与为请求项目指定键值相比,您可以使用数据类来请求项目,如注释行 1b 后所示。在提交请求之前,SDK 会在幕后提取密钥值。

当您使用基于密钥的方法指定项目时,如 2a 之后的两个语句所示,您还可以指定 DynamoDB 应执行一致性读取。使用该consistentRead()方法时,必须将其用于同一表的所有请求项目。

要检索 DynamoDB 找到的项目,请使用注释行 4 后显示resultsForTable() 的方法。为请求中读取的每个表调用方法。 resultsForTable()返回找到的项目列表,您可以使用任何java.util.List方法对其进行处理。此示例记录了每个项目。

要发现 DynamoDB 未处理的项目,请使用注释第 5 行后的方法。该BatchGetResultPage类的unproccessKeysForTable()方法使您可以访问每个未处理的密钥。BatchGetItemAPI 参考提供了有关导致项目未处理的情况的更多信息。

public static void batchGetItemExample(DynamoDbEnhancedClient enhancedClient, DynamoDbTable<Customer> customerTable, DynamoDbTable<MovieActor> movieActorTable) { Customer customer2 = new Customer(); customer2.setId("2"); customer2.setEmail("cust2@example.org"); // 1. Build a batch to read from the Customer table. ReadBatch customerBatch = ReadBatch.builder(Customer.class) .mappedTableResource(customerTable) // 1a. Specify the primary key values for the item. .addGetItem(b -> b.key(k -> k.partitionValue("1").sortValue("cust1@orgname.org"))) // 1b. Alternatively, supply a data class instances to provide the primary key values. .addGetItem(customer2) .build(); // 2. Build a batch to read from the MovieActor table. ReadBatch moveActorBatch = ReadBatch.builder(MovieActor.class) .mappedTableResource(movieActorTable) // 2a. Call consistentRead(Boolean.TRUE) for each item for the same table. .addGetItem(b -> b.key(k -> k.partitionValue("movie01").sortValue("actor1")).consistentRead(Boolean.TRUE)) .addGetItem(b -> b.key(k -> k.partitionValue("movie01").sortValue("actor4")).consistentRead(Boolean.TRUE)) .build(); // 3. Add ReadBatch objects to the request. BatchGetResultPageIterable resultPages = enhancedClient.batchGetItem(b -> b.readBatches(customerBatch, moveActorBatch)); // 4. Retrieve the successfully requested items from each table. resultPages.resultsForTable(customerTable).forEach(item -> logger.info(item.toString())); resultPages.resultsForTable(movieActorTable).forEach(item -> logger.info(item.toString())); // 5. Retrieve the keys of the items requested but not processed by the service. resultPages.forEach((BatchGetResultPage pageResult) -> { pageResult.unprocessedKeysForTable(customerTable).forEach(key -> logger.info("Unprocessed item key: " + key.toString())); pageResult.unprocessedKeysForTable(customerTable).forEach(key -> logger.info("Unprocessed item key: " + key.toString())); }); }

在运行示例代码之前,假设以下项目位于两个表中。

Customer [id=1, name=CustName1, email=cust1@example.org, regDate=2023-03-31T15:46:27.688Z] Customer [id=2, name=CustName2, email=cust2@example.org, regDate=2023-03-31T15:46:28.688Z] Customer [id=3, name=CustName3, email=cust3@example.org, regDate=2023-03-31T15:46:29.688Z] Customer [id=4, name=CustName4, email=cust4@example.org, regDate=2023-03-31T15:46:30.688Z] Customer [id=5, name=CustName5, email=cust5@example.org, regDate=2023-03-31T15:46:31.689Z] MovieActor{movieName='movie01', actorName='actor0', actingAward='actingaward0', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'} MovieActor{movieName='movie01', actorName='actor2', actingAward='actingaward2', actingYear=2001, actingSchoolName='actingschool2'} MovieActor{movieName='movie01', actorName='actor3', actingAward='actingaward3', actingYear=2001, actingSchoolName='null'} MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'}

以下输出显示了注释行 4 之后返回和记录的项目。

Customer [id=1, name=CustName1, email=cust1@example.org, regDate=2023-03-31T15:46:27.688Z] Customer [id=2, name=CustName2, email=cust2@example.org, regDate=2023-03-31T15:46:28.688Z] MovieActor{movieName='movie01', actorName='actor4', actingAward='actingaward4', actingYear=2001, actingSchoolName='actingschool4'} MovieActor{movieName='movie01', actorName='actor1', actingAward='actingaward1', actingYear=2001, actingSchoolName='actingschool1'}

batchWriteItem() 示例

batchWriteItem()方法在一个或多个表中放置或删除多个项目。您最多可以在请求中指定 25 个单独的放置或删除操作。以下示例使用前面显示的ProductCatalogMovieActor模型类。

WriteBatch对象是在注释行 1 和 2 之后生成的。对于ProductCatalog表,代码放置一个项目并删除一个项目。对于注释行 2 之后的MovieActor表,代码放置了两个项目并删除了一个。

batchWriteItem方法在注释行 3 之后调用。该builder参数为每个表提供批量请求。

返回的BatchWriteResult对象为每个操作提供单独的方法来查看未处理的请求。注释行 4a 之后的代码为未处理的删除请求提供密钥,注释行 4b 之后的代码提供未处理的 put 项目。

public static void batchWriteItemExample(DynamoDbEnhancedClient enhancedClient, DynamoDbTable<ProductCatalog> catalogTable, DynamoDbTable<MovieActor> movieActorTable) { // 1. Build a batch to write to the ProductCatalog table. WriteBatch products = WriteBatch.builder(ProductCatalog.class) .mappedTableResource(catalogTable) .addPutItem(b -> b.item(getProductCatItem1())) .addDeleteItem(b -> b.key(k -> k .partitionValue(getProductCatItem2().id()) .sortValue(getProductCatItem2().title()))) .build(); // 2. Build a batch to write to the MovieActor table. WriteBatch movies = WriteBatch.builder(MovieActor.class) .mappedTableResource(movieActorTable) .addPutItem(getMovieActorYeoh()) .addPutItem(getMovieActorBlanchettPartial()) .addDeleteItem(b -> b.key(k -> k .partitionValue(getMovieActorStreep().getMovieName()) .sortValue(getMovieActorStreep().getActorName()))) .build(); // 3. Add WriteBatch objects to the request. BatchWriteResult batchWriteResult = enhancedClient.batchWriteItem(b -> b.writeBatches(products, movies)); // 4. Retrieve keys for items the service did not process. // 4a. 'unprocessedDeleteItemsForTable()' returns keys for delete requests that did not process. if (batchWriteResult.unprocessedDeleteItemsForTable(movieActorTable).size() > 0) { batchWriteResult.unprocessedDeleteItemsForTable(movieActorTable).forEach(key -> logger.info(key.toString())); } // 4b. 'unprocessedPutItemsForTable()' returns keys for put requests that did not process. if (batchWriteResult.unprocessedPutItemsForTable(catalogTable).size() > 0) { batchWriteResult.unprocessedPutItemsForTable(catalogTable).forEach(key -> logger.info(key.toString())); } }

以下辅助方法为放置和删除操作提供模型对象。

public static ProductCatalog getProductCatItem1() { return ProductCatalog.builder() .id(2) .isbn("1-565-85698") .authors(new HashSet<>(Arrays.asList("a", "b"))) .price(BigDecimal.valueOf(30.22)) .title("Title 55") .build(); } public static ProductCatalog getProductCatItem2() { return ProductCatalog.builder() .id(4) .price(BigDecimal.valueOf(40.00)) .title("Title 1") .build(); } public static MovieActor getMovieActorBlanchettPartial() { MovieActor movieActor = new MovieActor(); movieActor.setActorName("Cate Blanchett"); movieActor.setMovieName("Blue Jasmine"); movieActor.setActingYear(2023); movieActor.setActingAward("Best Actress"); return movieActor; } public static MovieActor getMovieActorStreep() { MovieActor movieActor = new MovieActor(); movieActor.setActorName("Meryl Streep"); movieActor.setMovieName("Sophie's Choice"); movieActor.setActingYear(1982); movieActor.setActingAward("Best Actress"); movieActor.setActingSchoolName("Yale School of Drama"); return movieActor; } public static MovieActor getMovieActorYeoh(){ MovieActor movieActor = new MovieActor(); movieActor.setActorName("Michelle Yeoh"); movieActor.setMovieName("Everything Everywhere All at Once"); movieActor.setActingYear(2023); movieActor.setActingAward("Best Actress"); movieActor.setActingSchoolName("Royal Academy of Dance"); return movieActor; }

在运行示例代码之前,假设这些表包含以下项目。

MovieActor{movieName='Blue Jasmine', actorName='Cate Blanchett', actingAward='Best Actress', actingYear=2013, actingSchoolName='National Institute of Dramatic Art'} MovieActor{movieName='Sophie's Choice', actorName='Meryl Streep', actingAward='Best Actress', actingYear=1982, actingSchoolName='Yale School of Drama'} ProductCatalog{id=4, title='Title 1', isbn='orig_isbn', authors=[b, g], price=10}

示例代码完成后,表中包含以下项目。

MovieActor{movieName='Blue Jasmine', actorName='Cate Blanchett', actingAward='Best Actress', actingYear=2013, actingSchoolName='null'} MovieActor{movieName='Everything Everywhere All at Once', actorName='Michelle Yeoh', actingAward='Best Actress', actingYear=2023, actingSchoolName='Royal Academy of Dance'} ProductCatalog{id=2, title='Title 55', isbn='1-565-85698', authors=[a, b], price=30.22}

请注意,在MovieActor表格中,Blue Jasmine电影物品已被通过getMovieActorBlanchettPartial()辅助方法获取的 put 请求中使用的物品所取代。如果未提供 data bean 属性值,则数据库中的值将被删除。这就是为什么Blue Jasmine电影项目的结果actingSchoolName为 null 的原因。

注意

尽管 API 文档建议可以使用条件表达式,并且可以通过单个放入删除请求返回消耗的容量和收集指标,但在批量写入场景中情况并非如此。为了提高批处理操作的性能,这些单独的选项将被忽略。