例とユースケース
Lambda の耐久性のある関数により、ステップや待機などの耐久性のあるオペレーションを使用して、耐障害性のある複数ステップのアプリケーションを構築できます。自動チェックポイントおよびチェックポイント再生モデルにより、障害の後に実行が最初から再開始されても完了したチェックポイントがスキップされる場合、進行状況を失わずに、関数は障害から復旧して実行を再開できます。
短期間の耐障害性プロセス
耐久性のある関数を使用して、通常は数分で完了する信頼性の高いオペレーションを構築します。これらのプロセスは長時間のワークフローよりも短くなりますが、分散システム全体で自動チェックポイントおよび耐障害性を活用できます。耐久性のある関数により、複雑なエラー処理や状態管理コードを必要とせず、個々のサービス呼び出しが失敗しても、複数ステップのプロセスが正常に完了します。
一般的なシナリオにはホテルの予約システム、レストランの予約プラットフォーム、ライドシェアの旅行リクエスト、イベントのチケット購入、SaaS サブスクリプションのアップグレードなどが含まれます。これらのシナリオには共通の特性があります。これには、まとまって完了する必要がある複数のサービス呼び出し、一過性の障害に対する自動再試行の必要性、分散システム全体で一貫した状態を維持する要件があります。
マイクロサービス間の分散トランザクション
障害時の自動ロールバックにより、複数のサービス間で支払い、インベントリ、配送を調整します。各サービスオペレーションはステップでラップされ、トランザクションでサービスに障害が発生したらどこからでも復旧できます。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, amount, items } = event;
// Reserve inventory across multiple warehouses
const inventory = await context.step("reserve-inventory", async () => {
return await inventoryService.reserve(items);
});
// Process payment
const payment = await context.step("process-payment", async () => {
return await paymentService.charge(amount);
});
// Create shipment
const shipment = await context.step("create-shipment", async () => {
return await shippingService.createShipment(orderId, inventory);
});
return { orderId, status: 'completed', shipment };
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
amount = event['amount']
items = event['items']
# Reserve inventory across multiple warehouses
inventory = context.step(
lambda _: inventory_service.reserve(items),
name='reserve-inventory'
)
# Process payment
payment = context.step(
lambda _: payment_service.charge(amount),
name='process-payment'
)
# Create shipment
shipment = context.step(
lambda _: shipping_service.create_shipment(order_id, inventory),
name='create-shipment'
)
return {'orderId': order_id, 'status': 'completed', 'shipment': shipment}
いずれかのステップが失敗した場合、関数によって最後に成功したチェックポイントから自動的に再試行されます。支払い処理が一時的に失敗しても、インベントリ予約は保持されます。関数が再試行されると、完了したインベントリステップがスキップされ、支払い処理に直接進みます。重複する予約がなくなり、分散システム全体で一貫した状態が確保されます。
複数のステップによる注文処理
検証、支払い承認、インベントリ割り当て、自動再試行および復旧による履行を通じて注文を処理します。個々のステップが失敗して再試行されても、各ステップはチェックポイントされ、順序が進行されるようにします。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId, items } = event;
// Validate order details
const validation = await context.step("validate-order", async () => {
const customer = await customerService.validate(customerId);
const itemsValid = await inventoryService.validateItems(items);
return { customer, itemsValid };
});
if (!validation.itemsValid) {
return { orderId, status: 'rejected', reason: 'invalid_items' };
}
// Authorize payment
const authorization = await context.step("authorize-payment", async () => {
return await paymentService.authorize(
validation.customer.paymentMethod,
calculateTotal(items)
);
});
// Allocate inventory
const allocation = await context.step("allocate-inventory", async () => {
return await inventoryService.allocate(items);
});
// Fulfill order
const fulfillment = await context.step("fulfill-order", async () => {
return await fulfillmentService.createShipment({
orderId,
items: allocation.allocatedItems,
address: validation.customer.shippingAddress
});
});
return {
orderId,
status: 'completed',
trackingNumber: fulfillment.trackingNumber
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
items = event['items']
# Validate order details
def validate_order(_):
customer = customer_service.validate(customer_id)
items_valid = inventory_service.validate_items(items)
return {'customer': customer, 'itemsValid': items_valid}
validation = context.step(validate_order, name='validate-order')
if not validation['itemsValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_items'}
# Authorize payment
authorization = context.step(
lambda _: payment_service.authorize(
validation['customer']['paymentMethod'],
calculate_total(items)
),
name='authorize-payment'
)
# Allocate inventory
allocation = context.step(
lambda _: inventory_service.allocate(items),
name='allocate-inventory'
)
# Fulfill order
fulfillment = context.step(
lambda _: fulfillment_service.create_shipment({
'orderId': order_id,
'items': allocation['allocatedItems'],
'address': validation['customer']['shippingAddress']
}),
name='fulfill-order'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': fulfillment['trackingNumber']
}
このパターンにより、注文が中間状態で停止することはありません。検証が失敗した場合、支払い承認の前に注文が拒否されます。支払い承認が失敗した場合、インベントリは割り当てられません。各ステップは、自動再試行および復旧によって前のステップに基づいて構築されます。
注記
条件付きチェックの if (!validation.itemsValid) はステップ外にあり、再生中に再実行されます。これは決定的であるため安全です。同じ検証オブジェクトを前提として、常に同じ結果が生成されます。
長時間実行中のプロセス
数時間、数日、数週間に及ぶプロセスには、耐久性のある関数を使用します。待機オペレーションはコンピューティング料金を発生させずに実行を停止するため、長時間のプロセスはコスト効率が優れています。待機期間中、関数は実行を停止して Lambda は実行環境をリサイクルします。再開するとき、Lambda は関数を再度呼び出して最後のチェックポイントから再生します。
人間の決定、外部システムの応答、スケジュールされた処理ウィンドウ、時間ベースの遅延の待機を問わず、この実行モデルによって耐久性のある関数は長期間一時停止する必要があるプロセスには最適です。待機時間ではなく、アクティブなコンピューティング時間に対してのみ支払います。
一般的なシナリオには、ドキュメント承認プロセス、スケジュールされたバッチ処理、複数日間のオンボーディングプロセス、サブスクリプションのトライアルプロセス、遅延通知システムなどが含まれます。これらのシナリオには共通の特性があります。これには、数時間または数日で測定される長い待機期間、その待機時間にわたって実行状態を維持する必要性、アイドル状態のコンピューティング時間に莫大な支払い金額が発生するコスト重視の要件などがあります。
ヒューマンインザループ承認
実行状態が維持されながら、ドキュメントのレビュー、承認、決定の実行が一時停止されます。この関数はリソースを消費せずに外部コールバックを待機し、承認されると自動的に再開されます。
このパターンは、人間の判断や外部検証が必要なプロセスに不可欠です。関数はコールバックポイントで停止され、待機中にコンピューティング料金は発生しません。誰かが API を介して決定を送信すると、Lambda によって関数が再度呼び出されてチェックポイントから再生され、承認結果で続行されます。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { documentId, reviewers } = event;
// Step 1: Prepare document for review
const prepared = await context.step("prepare-document", async () => {
return await documentService.prepare(documentId);
});
// Step 2: Request approval with callback
const approval = await context.waitForCallback(
"approval-callback",
async (callbackId) => {
await notificationService.sendApprovalRequest({
documentId,
reviewers,
callbackId,
expiresIn: 86400
});
},
{
timeout: { seconds: 86400 }
}
);
// Function resumes here when approval is received
if (approval?.approved) {
const finalized = await context.step("finalize-document", async () => {
return await documentService.finalize(documentId, approval.comments);
});
return {
status: 'approved',
documentId,
finalizedAt: finalized.timestamp
};
}
// Handle rejection
await context.step("archive-rejected", async () => {
await documentService.archive(documentId, approval?.reason);
});
return {
status: 'rejected',
documentId,
reason: approval?.reason
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
@durable_execution
def lambda_handler(event, context: DurableContext):
document_id = event['documentId']
reviewers = event['reviewers']
# Step 1: Prepare document for review
prepared = context.step(
lambda _: document_service.prepare(document_id),
name='prepare-document'
)
# Step 2: Request approval with callback
def send_approval_request(callback_id):
notification_service.send_approval_request({
'documentId': document_id,
'reviewers': reviewers,
'callbackId': callback_id,
'expiresIn': 86400
})
approval = context.wait_for_callback(
send_approval_request,
name='approval-callback',
config=WaitConfig(timeout=86400)
)
# Function resumes here when approval is received
if approval and approval.get('approved'):
finalized = context.step(
lambda _: document_service.finalize(document_id, approval.get('comments')),
name='finalize-document'
)
return {
'status': 'approved',
'documentId': document_id,
'finalizedAt': finalized['timestamp']
}
# Handle rejection
context.step(
lambda _: document_service.archive(document_id, approval.get('reason') if approval else None),
name='archive-rejected'
)
return {
'status': 'rejected',
'documentId': document_id,
'reason': approval.get('reason') if approval else None
}
コールバックが受信されて関数が再開されると、最初から再生されます。prepare-document のステップでは、チェックポイントされた結果が即時に返されます。waitForCallback オペレーションでは、再度待機せずに、保存された承認結果とともに即時に返されます。その後、実行は確定またはアーカイブのステップに進みます。
マルチステージデータのパイプライン
ステージ間のチェックポイントを使用して、抽出、変換、読み込みフェーズによって大量のデータセットを処理します。各ステージは完了するまで数時間かかる場合があります。中断された場合、チェックポイントはパイプラインがどのステージでも再開できるようにします。
このパターンは ETL ワークフロー、データ移行、ステージ間に復旧ポイントがある状態でデータを段階的に処理する必要があるバッチ処理ジョブに最適です。ステージが失敗した場合、パイプラインは最初から再開始されるのではなく、最後に完了したステージから再開されます。待機オペレーションを使用してステージ間で一時停止することもできます。レート制限の遵守、ダウンストリームシステムの準備の待機、オフピーク時に処理のスケジューリングを行えます。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize } = event;
// Stage 1: Extract data from source
const extracted = await context.step("extract-data", async () => {
const records = await sourceDatabase.extractRecords(datasetId);
return { recordCount: records.length, records };
});
// Wait 5 minutes to respect source system rate limits
await context.wait({ seconds: 300 });
// Stage 2: Transform data in batches
const transformed = await context.step("transform-data", async () => {
const batches = chunkArray(extracted.records, batchSize);
const results = [];
for (const batch of batches) {
const transformed = await transformService.processBatch(batch);
results.push(transformed);
}
return { batchCount: batches.length, results };
});
// Wait until off-peak hours (e.g., 2 AM)
const now = new Date();
const targetHour = 2;
const msUntilTarget = calculateMsUntilHour(now, targetHour);
await context.wait({ seconds: Math.floor(msUntilTarget / 1000) });
// Stage 3: Load data to destination
const loaded = await context.step("load-data", async () => {
let loadedCount = 0;
for (const result of transformed.results) {
await destinationDatabase.loadBatch(result);
loadedCount += result.length;
}
return { loadedCount };
});
// Stage 4: Verify and finalize
const verified = await context.step("verify-pipeline", async () => {
const verification = await destinationDatabase.verifyRecords(datasetId);
await pipelineService.markComplete(datasetId, verification);
return verification;
});
return {
datasetId,
recordsProcessed: extracted.recordCount,
batchesProcessed: transformed.batchCount,
recordsLoaded: loaded.loadedCount,
verified: verified.success
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event['batchSize']
# Stage 1: Extract data from source
def extract_data(_):
records = source_database.extract_records(dataset_id)
return {'recordCount': len(records), 'records': records}
extracted = context.step(extract_data, name='extract-data')
# Wait 5 minutes to respect source system rate limits
context.wait(300)
# Stage 2: Transform data in batches
def transform_data(_):
batches = chunk_array(extracted['records'], batch_size)
results = []
for batch in batches:
transformed = transform_service.process_batch(batch)
results.append(transformed)
return {'batchCount': len(batches), 'results': results}
transformed = context.step(transform_data, name='transform-data')
# Wait until off-peak hours (e.g., 2 AM)
now = datetime.now()
target_hour = 2
ms_until_target = calculate_ms_until_hour(now, target_hour)
context.wait(ms_until_target // 1000)
# Stage 3: Load data to destination
def load_data(_):
loaded_count = 0
for result in transformed['results']:
destination_database.load_batch(result)
loaded_count += len(result)
return {'loadedCount': loaded_count}
loaded = context.step(load_data, name='load-data')
# Stage 4: Verify and finalize
def verify_pipeline(_):
verification = destination_database.verify_records(dataset_id)
pipeline_service.mark_complete(dataset_id, verification)
return verification
verified = context.step(verify_pipeline, name='verify-pipeline')
return {
'datasetId': dataset_id,
'recordsProcessed': extracted['recordCount'],
'batchesProcessed': transformed['batchCount'],
'recordsLoaded': loaded['loadedCount'],
'verified': verified['success']
}
各ステージはステップでラップされ、中断された場合にパイプラインがどのステージでも再開できるチェックポイントが作成されます。抽出と変換の間の 5 分待機は、コンピューティングリソースを消費せずにソースシステムのレート制限を遵守しますが、午前 2 時までの待機はオフピーク時に高価な読み込みオペレーションをスケジューリングします。
注記
new Date() 呼び出しおよび calculateMsUntilHour() 関数はステップ外にあり、再生中に再実行されます。再生間で一貫性を持たせる必要がある時間ベースのオペレーションには、ステップ内のタイムスタンプを計算するか、待機時間 (チェックポイントの対象になる) にのみ使用します。
関数の連鎖呼び出し
context.invoke() を使用して、耐久性のある関数内で他の Lambda 関数を呼び出します。呼び出される関数が完了するまで、呼び出し元の関数は待機しながら停止し、結果を保持するチェックポイントが作成されます。呼び出される関数が完了した後に呼び出し元の関数が中断された場合、関数を再呼び出しせずに、保存された結果で再開されます。
特定のドメイン (顧客検証、支払い処理、インベントリ管理) を処理する特殊な関数を持って、ワークフローで調整する必要があるときに、このパターンを使用します。各関数は独自のロジックを維持し、複数のオーケストレーター関数で呼び出すことができるため、コードの重複を回避できます。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
// Main orchestrator function
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { orderId, customerId } = event;
// Step 1: Validate customer by invoking customer service function
const customer = await context.invoke(
"validate-customer",
"arn:aws:lambda:us-east-1:123456789012:function:customer-service:1",
{ customerId }
);
if (!customer.isValid) {
return { orderId, status: "rejected", reason: "invalid_customer" };
}
// Step 2: Check inventory by invoking inventory service function
const inventory = await context.invoke(
"check-inventory",
"arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1",
{ orderId, items: event.items }
);
if (!inventory.available) {
return { orderId, status: "rejected", reason: "insufficient_inventory" };
}
// Step 3: Process payment by invoking payment service function
const payment = await context.invoke(
"process-payment",
"arn:aws:lambda:us-east-1:123456789012:function:payment-service:1",
{
customerId,
amount: inventory.totalAmount,
paymentMethod: customer.paymentMethod
}
);
// Step 4: Create shipment by invoking fulfillment service function
const shipment = await context.invoke(
"create-shipment",
"arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1",
{
orderId,
items: inventory.allocatedItems,
address: customer.shippingAddress
}
);
return {
orderId,
status: "completed",
trackingNumber: shipment.trackingNumber,
estimatedDelivery: shipment.estimatedDelivery
};
}
);
- Python
-
from aws_durable_execution_sdk_python import DurableContext, durable_execution
# Main orchestrator function
@durable_execution
def lambda_handler(event, context: DurableContext):
order_id = event['orderId']
customer_id = event['customerId']
# Step 1: Validate customer by invoking customer service function
customer = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:customer-service:1',
{'customerId': customer_id},
name='validate-customer'
)
if not customer['isValid']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'invalid_customer'}
# Step 2: Check inventory by invoking inventory service function
inventory = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:inventory-service:1',
{'orderId': order_id, 'items': event['items']},
name='check-inventory'
)
if not inventory['available']:
return {'orderId': order_id, 'status': 'rejected', 'reason': 'insufficient_inventory'}
# Step 3: Process payment by invoking payment service function
payment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:payment-service:1',
{
'customerId': customer_id,
'amount': inventory['totalAmount'],
'paymentMethod': customer['paymentMethod']
},
name='process-payment'
)
# Step 4: Create shipment by invoking fulfillment service function
shipment = context.invoke(
'arn:aws:lambda:us-east-1:123456789012:function:fulfillment-service:1',
{
'orderId': order_id,
'items': inventory['allocatedItems'],
'address': customer['shippingAddress']
},
name='create-shipment'
)
return {
'orderId': order_id,
'status': 'completed',
'trackingNumber': shipment['trackingNumber'],
'estimatedDelivery': shipment['estimatedDelivery']
}
各呼び出しでは、オーケストレーター関数にチェックポイントが作成されます。顧客検証の完了後にオーケストレーターが中断された場合、保存された顧客データを使用してそのチェックポイントから再開され、検証呼び出しがスキップされます。これにより、ダウンストリームサービスへの重複呼び出しが防止され、中断しても一貫した実行が実現されます。
呼び出される関数は、耐久性のある Lambda 関数または標準の Lambda 関数のいずれかの場合があります。耐久性のある関数を呼び出した場合、待機やチェックポイントを含む独自の複数ステップのワークフローを持つことがあります。オーケストレーターは耐久性のある完全な実行が終了されるまで待ち、最終結果を受け取ります。
クロスアカウントの呼び出しはサポートされていません。呼び出されるすべての関数は、呼び出し元の関数と同じ AWS アカウントに存在する必要があります。
高度なパターン
耐久性のある関数を使用して、複数の耐久性のあるオペレーション、並列実行、配列処理、条件付きロジック、ポーリングを組み合わせた複雑な複数ステップのアプリケーションを構築します。これらのパターンにより、耐障害性および自動復旧を維持しながら、多くのタスクを調整する高度なアプリケーションを構築できます。
高度なパターンは、単純なシーケンシャルステップを超えます。オペレーションは parallel() と同時に実行し、map() で配列を処理し、waitForCondition() で外部条件を待機し、これらのプリミティブを組み合わせて信頼性の高いアプリケーションを構築できます。耐久性のある各オペレーションに独自のチェックポイントが作成されるため、アプリケーションが中断されたらどこからでも復旧できます。
ユーザーのオンボーディングプロセス
登録、E メール認証、プロフィール設定、再試行処理による初期設定を通じてユーザーを案内します。この例では、シーケンシャルステップ、コールバック、条件付きロジックを組み合わせて、完全なオンボーディングプロセスを作成します。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { userId, email } = event;
// Step 1: Create user account
const user = await context.step("create-account", async () => {
return await userService.createAccount(userId, email);
});
// Step 2: Send verification email
await context.step("send-verification", async () => {
return await emailService.sendVerification(email);
});
// Step 3: Wait for email verification (up to 48 hours)
const verified = await context.waitForCallback(
"email-verification",
async (callbackId) => {
await notificationService.sendVerificationLink({
email,
callbackId,
expiresIn: 172800
});
},
{
timeout: { seconds: 172800 }
}
);
if (!verified) {
await context.step("send-reminder", async () => {
await emailService.sendReminder(email);
});
return {
status: "verification_timeout",
userId,
message: "Email verification not completed within 48 hours"
};
}
// Step 4: Initialize user profile in parallel
const setupResults = await context.parallel("profile-setup", [
async (ctx: DurableContext) => {
return await ctx.step("create-preferences", async () => {
return await preferencesService.createDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("setup-notifications", async () => {
return await notificationService.setupDefaults(userId);
});
},
async (ctx: DurableContext) => {
return await ctx.step("create-welcome-content", async () => {
return await contentService.createWelcome(userId);
});
}
]);
// Step 5: Send welcome email
await context.step("send-welcome", async () => {
const [preferences, notifications, content] = setupResults.getResults();
return await emailService.sendWelcome({
email,
preferences,
notifications,
content
});
});
return {
status: "onboarding_complete",
userId,
completedAt: new Date().toISOString()
};
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, WaitConfig
from datetime import datetime
@durable_execution
def lambda_handler(event, context: DurableContext):
user_id = event['userId']
email = event['email']
# Step 1: Create user account
user = context.step(
lambda _: user_service.create_account(user_id, email),
name='create-account'
)
# Step 2: Send verification email
context.step(
lambda _: email_service.send_verification(email),
name='send-verification'
)
# Step 3: Wait for email verification (up to 48 hours)
def send_verification_link(callback_id):
notification_service.send_verification_link({
'email': email,
'callbackId': callback_id,
'expiresIn': 172800
})
verified = context.wait_for_callback(
send_verification_link,
name='email-verification',
config=WaitConfig(timeout=172800)
)
if not verified:
context.step(
lambda _: email_service.send_reminder(email),
name='send-reminder'
)
return {
'status': 'verification_timeout',
'userId': user_id,
'message': 'Email verification not completed within 48 hours'
}
# Step 4: Initialize user profile in parallel
def create_preferences(ctx: DurableContext):
return ctx.step(
lambda _: preferences_service.create_defaults(user_id),
name='create-preferences'
)
def setup_notifications(ctx: DurableContext):
return ctx.step(
lambda _: notification_service.setup_defaults(user_id),
name='setup-notifications'
)
def create_welcome_content(ctx: DurableContext):
return ctx.step(
lambda _: content_service.create_welcome(user_id),
name='create-welcome-content'
)
setup_results = context.parallel(
[create_preferences, setup_notifications, create_welcome_content],
name='profile-setup'
)
# Step 5: Send welcome email
def send_welcome(_):
results = setup_results.get_results()
preferences, notifications, content = results[0], results[1], results[2]
return email_service.send_welcome({
'email': email,
'preferences': preferences,
'notifications': notifications,
'content': content
})
context.step(send_welcome, name='send-welcome')
return {
'status': 'onboarding_complete',
'userId': user_id,
'completedAt': datetime.now().isoformat()
}
このプロセスでは、アカウント作成と E メール検証のためにシーケンシャルステップとチェックポイントを組み合わせたら、リソースを消費せずに E メール認証を待機するため、最大 48 時間一時停止します。検証の完了またはタイムアウトに基づき、条件付きロジックは異なるパスを処理します。プロフィールのセットアップタスクは並列オペレーションを使用して同時に実行され、合計実行時間を短縮します。各ステップでは、一過性の障害時に自動的に再試行され、オンボーディングを確実に完了させます。
チェックポイントを使用したバッチ処理
障害の後に最後に成功したチェックポイントでの自動復旧により、数百万件のレコードを処理します。この例では、耐久性のある関数が map() オペレーションをチャンキングおよびレート制限と組み合わて、大規模なデータ処理に対応する方法が示されます。
- TypeScript
-
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
interface Batch {
batchIndex: number;
recordIds: string[];
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const { datasetId, batchSize = 1000 } = event;
// Step 1: Get all record IDs to process
const recordIds = await context.step("fetch-record-ids", async () => {
return await dataService.getRecordIds(datasetId);
});
// Step 2: Split into batches
const batches: Batch[] = [];
for (let i = 0; i < recordIds.length; i += batchSize) {
batches.push({
batchIndex: Math.floor(i / batchSize),
recordIds: recordIds.slice(i, i + batchSize)
});
}
// Step 3: Process batches with controlled concurrency
const batchResults = await context.map(
"process-batches",
batches,
async (ctx: DurableContext, batch: Batch, index: number) => {
const processed = await ctx.step(`batch-${batch.batchIndex}`, async () => {
const results = [];
for (const recordId of batch.recordIds) {
const result = await recordService.process(recordId);
results.push(result);
}
return results;
});
const validated = await ctx.step(`validate-${batch.batchIndex}`, async () => {
return await validationService.validateBatch(processed);
});
return {
batchIndex: batch.batchIndex,
recordCount: batch.recordIds.length,
successCount: validated.successCount,
failureCount: validated.failureCount
};
},
{
maxConcurrency: 5
}
);
// Step 4: Aggregate results
const summary = await context.step("aggregate-results", async () => {
const results = batchResults.getResults();
const totalSuccess = results.reduce((sum, r) => sum + r.successCount, 0);
const totalFailure = results.reduce((sum, r) => sum + r.failureCount, 0);
return {
datasetId,
totalRecords: recordIds.length,
batchesProcessed: batches.length,
successCount: totalSuccess,
failureCount: totalFailure,
completedAt: new Date().toISOString()
};
});
return summary;
}
);
- Python
-
from aws_durable_execution_sdk import DurableContext, durable_execution, MapConfig
from datetime import datetime
from typing import List, Dict
@durable_execution
def lambda_handler(event, context: DurableContext):
dataset_id = event['datasetId']
batch_size = event.get('batchSize', 1000)
# Step 1: Get all record IDs to process
record_ids = context.step(
lambda _: data_service.get_record_ids(dataset_id),
name='fetch-record-ids'
)
# Step 2: Split into batches
batches = []
for i in range(0, len(record_ids), batch_size):
batches.append({
'batchIndex': i // batch_size,
'recordIds': record_ids[i:i + batch_size]
})
# Step 3: Process batches with controlled concurrency
def process_batch(ctx: DurableContext, batch: Dict, index: int):
batch_index = batch['batchIndex']
def process_records(_):
results = []
for record_id in batch['recordIds']:
result = record_service.process(record_id)
results.append(result)
return results
processed = ctx.step(process_records, name=f'batch-{batch_index}')
validated = ctx.step(
lambda _: validation_service.validate_batch(processed),
name=f'validate-{batch_index}'
)
return {
'batchIndex': batch_index,
'recordCount': len(batch['recordIds']),
'successCount': validated['successCount'],
'failureCount': validated['failureCount']
}
batch_results = context.map(
process_batch,
batches,
name='process-batches',
config=MapConfig(max_concurrency=5)
)
# Step 4: Aggregate results
def aggregate_results(_):
results = batch_results.get_results()
total_success = sum(r['successCount'] for r in results)
total_failure = sum(r['failureCount'] for r in results)
return {
'datasetId': dataset_id,
'totalRecords': len(record_ids),
'batchesProcessed': len(batches),
'successCount': total_success,
'failureCount': total_failure,
'completedAt': datetime.now().isoformat()
}
summary = context.step(aggregate_results, name='aggregate-results')
return summary
レコードは管理可能なバッチに分割され、メモリやダウンストリームサービスの圧迫を回避します。その後に複数のバッチが並列処理され、maxConcurrency によって同時処理が制御されます。各バッチには独自のチェックポイントがあるため、失敗はすべてのレコードで再処理されるのではなく、失敗したバッチのみで再試行されます。このパターンは処理に数時間かかる可能性がある ETL ジョブ、データ移行、一括操作に最適です。
次のステップ