DynamoDB は、最大 25 の PutItem
リクエストと DeleteItem
リクエストを単一のバッチで実行できる、BatchWriteItem
などのバッチオペレーションをサポートしています。ただし、BatchWriteItem
は UpdateItem
オペレーションはサポートしておらず、条件式もサポートしていません。この回避策として、最大 100 のバッチサイズには、TransactWriteItems
などのその他の DynamoDB API を使用できます。
これ以上の項目が含まれており、大量のデータの変更が必要な場合は、AWS Glue、Amazon EMR AWS Step Functions などのサービスを使用するか、DynamoDB シェルなどのカスタムスクリプトやツールを使用すると、効率的に一括更新できます。
このパターンを使用すべきケース
DynamoDB シェルは、本番稼働のユースケースではサポートされていません。
TransactWriteItems
– 条件付きまたは条件なしの最大 100 の個別の更新。all-or-nothing の ACID バンドルとして実行されます。アプリケーションでべき等性が必要な場合、つまり複数の同一の呼び出しが 1 回の呼び出しと同じ効果を持つような場合は、TransactWriteItems
コールにClientRequestToken
を指定することもできます。これにより、同じトランザクションを複数回実行してもデータの状態が正しくないという状況を避けることができます。トレードオフ – 追加のスループットが使用されます。標準の 1 KB の書き込みあたり 1 WGU ではなく、1 KB の書き込みあたり 2 WCU になります。
PartiQL
BatchExecuteStatement
– 条件付きまたは条件なしで、最大 25 件の更新。BatchExecuteStatement
は常にリクエスト全体に成功応答を返し、順序を維持する個別のオペレーション応答のリストも返します。トレードオフ – バッチが大きい場合、リクエストを 25 のバッチで分散するために、クライアント側で追加のロジックが必要です。再試行戦略を決定するには、個別のエラーレスポンスを考慮する必要があります。
コードの例
これらのコード例では、AWS SDK for Python である boto3 ライブラリを使用しています。これらの例では、boto3 がインストール済みで、適切な AWS 認証情報で設定されていることを前提としています。
ヨーロッパの都市に複数の倉庫がある電気機器ベンダーの在庫データベースがあるとします。現在は夏の終わりであるため、ベンダーはデスク用ファンを処分して他の製品の在庫スペースを確保したいと考えています。ベンダーは、イタリアの倉庫から供給されたすべてのデスク用ファンについて、デスク用ファンの予備在庫が 20 ある場合に限り、価格の割引を提供することを検討しています。DynamoDB テーブル名は、inventory であり、このテーブルには、各製品の一意の識別子であるパーティションキー sku のキースキーマと、倉庫の識別子であるソートキー warehouse があります。
以下の Python コードは、BatchExecuteStatement
API コールを使用して、この条件付きバッチ更新を実行する方法を説明しています。
import boto3
client=boto3.client("dynamodb")
before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])
response=client.batch_execute_statement(
Statements=[
{'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITTUR1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
{'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
{'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
{'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITROM5'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
{'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN1'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
{'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN2'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
{'Statement': 'UPDATE inventory SET price=price-5 WHERE sku=? AND warehouse=? AND quantity > 20', 'Parameters': [{'S':'F123'}, {'S':'WITVEN3'}], 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'},
],
ReturnConsumedCapacity='TOTAL'
)
after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
実行すると、サンプルデータに以下の出力が生成されます。
Before update: [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}]
After update: [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '40'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '33'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '35'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '38'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '30'}, 'sku': {'S': 'F123'}}]
これは内部システムの境界オペレーションであるため、べき等性の要件は考慮されていません。価格更新のような追加のガードレールを配置できるのは、価格が 35 より大きく 40 より小さい場合のみとするなど、更新をより堅牢にすることができます。
または、べき等性や ACID 要件をより厳密にする要件の場合は、TransactWriteItems
を使用して同じバッチ更新オペレーションを実行できます。ただし、トランザクションバンドル内のすべてのオペレーションが完了するか、バンドル全体が失敗する、という点に注意する必要があります。
イタリアで熱波が発生し、デスク用ファンの需要が急増したと仮定します。ベンダーは、イタリアのすべての倉庫から出荷されるデスク用ファンのコストを 20 ユーロ引き上げたいと考えていますが、規制機関がこのコスト上昇を許可するのは、現在のコストが在庫全体で 70 ユーロ未満の場合にのみです。在庫全体で価格が一度に一度のみ更新され、更新されるのは各倉庫のコストが 70 ユーロ未満の場合にのみである天が重要です。
次の Python コードは、TransactWriteItems
API コールを使用して、このバッチ更新を実行する方法を説明しています。
import boto3
client=boto3.client("dynamodb")
before_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("Before update: ", before_image['Items'])
response=client.transact_write_items(
ClientRequestToken='UUIDAWS124',
TransactItems=[
{'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITTUR1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
{'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
{'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
{'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITROM5'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
{'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN1'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
{'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN2'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
{'Update': { 'Key': {'sku': {'S':'F123'}, 'warehouse': {'S':'WITVEN3'}}, 'UpdateExpression': 'SET price = price + :inc', 'ConditionExpression': 'price < :cap', 'ExpressionAttributeValues': { ':inc': {'N': '20'}, ':cap': {'N': '70'}}, 'TableName': 'inventory', 'ReturnValuesOnConditionCheckFailure': 'ALL_OLD'}},
],
ReturnConsumedCapacity='TOTAL'
)
after_image=client.query(TableName='inventory', KeyConditionExpression='sku=:pk_val AND begins_with(warehouse, :sk_val)', ExpressionAttributeValues={':pk_val':{'S':'F123'},':sk_val':{'S':'WIT'}}, ProjectionExpression='sku,warehouse,quantity,price')
print("After update: ", after_image['Items'])
実行すると、サンプルデータに以下の出力が生成されます。
Before update: [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '60'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '53'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '55'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '58'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '50'}, 'sku': {'S': 'F123'}}]
After update: [{'quantity': {'N': '20'}, 'warehouse': {'S': 'WITROM1'}, 'price': {'N': '80'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '25'}, 'warehouse': {'S': 'WITROM2'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '28'}, 'warehouse': {'S': 'WITROM5'}, 'price': {'N': '73'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '26'}, 'warehouse': {'S': 'WITTUR1'}, 'price': {'N': '75'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '10'}, 'warehouse': {'S': 'WITVEN1'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '20'}, 'warehouse': {'S': 'WITVEN2'}, 'price': {'N': '78'}, 'sku': {'S': 'F123'}}, {'quantity': {'N': '50'}, 'warehouse': {'S': 'WITVEN3'}, 'price': {'N': '70'}, 'sku': {'S': 'F123'}}]
DynamoDB でバッチ更新を実行する方法は、複数あります。適切なアプローチは、ACID やべき等性の要件、更新する項目数、API の知識などの要因によって異なります。