Steps¶
Checkpointed results¶
A step executes the code you provide and checkpoints the result. On replay, the SDK returns the checkpointed result rather than re-running the code inside the step.
Use steps to encapsulate any code that should not re-run once it has completed.
Wrapping non-deterministic code in steps is the primary way you ensure that your durable execution is deterministic. Non-deterministic code includes fetching the current time, generating a random number or UUID, causing side-effects such as writing to disk, or calling an API that might return a different result on different calls.
When you encapsulate such code in a step it becomes deterministic in your durable execution because the step doesn’t generate different results on replay.
If a step fails during execution, it retries according to its configured retry strategy. The step will checkpoint the last error after exhausting all retry attempts.
import {
DurableContext,
StepContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
async function addNumbers(ctx: StepContext, a: number, b: number): Promise<number> {
return a + b;
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const result = await context.step("add_numbers", (ctx) => addNumbers(ctx, 5, 3));
return result;
},
);
from aws_durable_execution_sdk_python import (
DurableContext,
StepContext,
durable_execution,
durable_step,
)
@durable_step
def add_numbers(step_context: StepContext, a: int, b: int) -> int:
return a + b
@durable_execution
def handler(event: dict, context: DurableContext) -> int:
result = context.step(add_numbers(5, 3))
return result
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.StepContext;
public class AddNumbersExample extends DurableHandler<Object, Integer> {
@Override
public Integer handleRequest(Object input, DurableContext context) {
int result = context.step("add_numbers", Integer.class,
(StepContext ctx) -> 5 + 3);
return result;
}
}
Method signature¶
step¶
import { DurableContext, StepConfig, StepFunc } from "@aws/durable-execution-sdk-js";
// Sync signature (unnamed)
context.step(fn: StepFunc<T>, config?: StepConfig<T>): DurablePromise<T>
// Named signature
context.step(name: string | undefined, fn: StepFunc<T>, config?: StepConfig<T>): DurablePromise<T>
Parameters:
name(optional) A name for the step. Passundefinedto omit.fnA function that receives aStepContextand returns aPromise<T>.config(optional) AStepConfig<T>object.
Returns: DurablePromise<T>. Use await to get the result.
Throws: DurableOperationError wrapping the original error after retries are
exhausted. StepInterruptedError if an at-most-once step was interrupted.
from aws_durable_execution_sdk_python import StepContext
from aws_durable_execution_sdk_python.config import StepConfig
def step(
func: Callable[[StepContext], T],
name: str | None = None,
config: StepConfig | None = None,
) -> T: ...
Parameters:
funcA callable that receives aStepContextand returnsT.name(optional) A name for the step. Defaults to the function's name when using@durable_step.config(optional) AStepConfigobject.
Returns: T, the return value of func.
Raises: CallableRuntimeError wrapping the original exception after retries are
exhausted. StepInterruptedError if an at-most-once step was interrupted.
// Sync (blocks until complete)
<T> T step(String name, Class<T> resultType, Function<StepContext, T> func)
<T> T step(String name, Class<T> resultType, Function<StepContext, T> func, StepConfig config)
// Async (returns a DurableFuture)
<T> DurableFuture<T> stepAsync(String name, Class<T> resultType, Function<StepContext, T> func)
<T> DurableFuture<T> stepAsync(String name, Class<T> resultType, Function<StepContext, T> func, StepConfig config)
Parameters:
name(required) A name for the step.resultTypeTheClass<T>orTypeToken<T>for deserialization.funcAFunction<StepContext, T>to execute.config(optional) AStepConfigobject.
Returns: T (sync) or DurableFuture<T> (async via stepAsync()).
Throws: The original exception re-thrown after deserialization if possible,
otherwise StepFailedException. StepInterruptedException if an at-most-once step was
interrupted.
StepConfig¶
interface StepConfig<T> {
retryStrategy?: (error: Error, attemptCount: number) => RetryDecision;
semantics?: StepSemantics;
serdes?: Serdes<T>;
}
Parameters:
retryStrategy(optional) A function returning aRetryDecision. UsecreateRetryStrategy()to build one. See Retry strategies.semantics(optional)StepSemantics.AtLeastOncePerRetry(default) orStepSemantics.AtMostOncePerRetry.serdes(optional) CustomSerdes<T>for the step result. See Serialization.
@dataclass(frozen=True)
class StepConfig:
retry_strategy: Callable[[Exception, int], RetryDecision] | None = None
step_semantics: StepSemantics = StepSemantics.AT_LEAST_ONCE_PER_RETRY
serdes: SerDes | None = None
Parameters:
retry_strategy(optional) A callable returning aRetryDecision. Usecreate_retry_strategy()to build one. See Retry strategies.step_semantics(optional)StepSemantics.AT_LEAST_ONCE_PER_RETRY(default) orStepSemantics.AT_MOST_ONCE_PER_RETRY.serdes(optional) CustomSerDesfor the step result. See Serialization.
StepConfig.builder()
.retryStrategy(RetryStrategy) // optional
.semantics(StepSemantics) // optional
.serDes(SerDes) // optional
.build()
Parameters:
retryStrategy(optional) ARetryStrategyinstance. UseRetryStrategies.exponentialBackoff()to build one. See Retry strategies.semantics(optional)StepSemantics.AT_LEAST_ONCE_PER_RETRY(default) orStepSemantics.AT_MOST_ONCE_PER_RETRY.serDes(optional) CustomSerDesfor the step result. See Serialization.
StepContext¶
loggerA logger enriched with execution context metadata. See Logging.
loggerA logger enriched with execution context metadata. See Logging.
interface StepContext {
DurableLogger getLogger();
int getAttempt(); // current retry attempt, 0-based
boolean isReplaying();
}
getLogger()A logger enriched with execution context metadata. See Logging.getAttempt()The current retry attempt number (0-based).isReplaying()Whether the function is currently replaying from a checkpoint.
StepSemantics¶
enum StepSemantics {
AtLeastOncePerRetry = "AT_LEAST_ONCE_PER_RETRY",
AtMostOncePerRetry = "AT_MOST_ONCE_PER_RETRY",
}
AtLeastOncePerRetry(default) Re-executes the step if the function replays before the result is checkpointed. Safe for idempotent operations.AtMostOncePerRetryExecutes the step at most once per retry attempt. If the function replays before the result is checkpointed, the SDK skips the step and raisesStepInterruptedError. Use for operations with side effects.
class StepSemantics(Enum):
AT_LEAST_ONCE_PER_RETRY = "AT_LEAST_ONCE_PER_RETRY"
AT_MOST_ONCE_PER_RETRY = "AT_MOST_ONCE_PER_RETRY"
AT_LEAST_ONCE_PER_RETRY(default) Re-execute the step if the function replays before the result has checkpointed. Safe for idempotent operations.AT_MOST_ONCE_PER_RETRYExecute the step at most once per retry attempt. If the function replays before the result has checkpointed, the SDK skips the step and raisesStepInterruptedError. Use for operations with side effects.
AT_LEAST_ONCE_PER_RETRY(default) Re-executes the step if the function replays before the result is checkpointed. Safe for idempotent operations.AT_MOST_ONCE_PER_RETRYExecutes the step at most once per retry attempt. If the function replays before the result is checkpointed, the SDK skips the step and throwsStepInterruptedException. Use for operations with side effects.
The Step's function¶
A step function receives a StepContext as its first parameter.
Pass any async function directly.
import {
DurableContext,
StepContext,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
async function validateOrder(ctx: StepContext, orderId: string): Promise<object> {
return { order_id: orderId, valid: true };
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const orderId = event.order_id;
const validation = await context.step("validate_order", (ctx) =>
validateOrder(ctx, orderId),
);
return validation;
},
);
Step functions are async. await the result of context.step().
Use the @durable_step decorator. It automatically uses the function's name as the step
name. Step functions must be synchronous.
from aws_durable_execution_sdk_python import (
DurableContext,
StepContext,
durable_execution,
durable_step,
)
from aws_durable_execution_sdk_python.config import StepConfig
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)
@durable_step
def validate_order(step_context: StepContext, order_id: str) -> dict:
return {"order_id": order_id, "valid": True}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
order_id = event["order_id"]
validation = context.step(validate_order(order_id))
return validation
Pass a lambda or method reference directly. Step functions are synchronous. Use
stepAsync() to get a DurableFuture<T> you can compose with other async operations.
import java.util.Map;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.StepContext;
public class ValidateOrderExample extends DurableHandler<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> handleRequest(Map<String, Object> event, DurableContext context) {
String orderId = (String) event.get("order_id");
Map<String, Object> validation = context.step("validate_order", Map.class,
(StepContext ctx) -> Map.of("order_id", orderId, "valid", true));
return validation;
}
}
Anonymous step functions¶
You can also use inline lambdas.
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
// Lambda without a name — no automatic name
const result = await context.step(async () => "some value");
return result;
},
);
If you use an anonymous function it will not automatically get named like the
@durable_step decorator does.
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.StepContext;
public class LambdaStepNoNameExample extends DurableHandler<Object, String> {
@Override
public String handleRequest(Object input, DurableContext context) {
// Java requires a name — use a descriptive string
String result = context.step("my_step", String.class,
(StepContext ctx) -> "some value");
return result;
}
}
Pass arguments to the step function¶
Capture arguments in a closure:
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
async function myStep(arg1: string, arg2: number): Promise<string> {
return `${arg1}: ${arg2}`;
}
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const result = await context.step("my_step", async () => myStep("value", 42));
return result;
},
);
Use @durable_step and pass arguments when calling the function:
Capture arguments in a lambda:
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.StepContext;
public class MultiArgumentStepExample extends DurableHandler<Object, String> {
@Override
public String handleRequest(Object input, DurableContext context) {
String arg1 = "value";
int arg2 = 42;
String result = context.step("my_step", String.class,
(StepContext ctx) -> arg1 + ": " + arg2);
return result;
}
}
Naming steps¶
Name your steps so they're easy to identify in logs and tests. Use descriptive names that explain what the step does. Names don't need to be unique, but distinct names make debugging easier.
The name is the first argument. Pass undefined to omit it.
The @durable_step decorator uses the function's name automatically as the step name.
Override it with the name keyword argument.
The name is always the first argument. Pass null for no name.
Configuration¶
Configure step behavior using StepConfig:
import {
DurableContext,
StepConfig,
StepSemantics,
createRetryStrategy,
withDurableExecution,
} from "@aws/durable-execution-sdk-js";
async function processData(data: string): Promise<object> {
return { processed: data, status: "completed" };
}
const stepConfig: StepConfig<object> = {
retryStrategy: createRetryStrategy({
maxAttempts: 3,
}),
semantics: StepSemantics.AtLeastOncePerRetry,
};
export const handler = withDurableExecution(
async (event: any, context: DurableContext) => {
const result = await context.step(
"process_data",
async () => processData(event.data),
stepConfig,
);
return result;
},
);
from aws_durable_execution_sdk_python import (
DurableContext,
StepContext,
durable_execution,
durable_step,
)
from aws_durable_execution_sdk_python.config import StepConfig, StepSemantics
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
create_retry_strategy,
)
@durable_step
def process_data(step_context: StepContext, data: str) -> dict:
return {"processed": data, "status": "completed"}
@durable_execution
def handler(event: dict, context: DurableContext) -> dict:
retry_config = RetryStrategyConfig(
max_attempts=3,
retryable_error_types=[RuntimeError, ValueError],
)
step_config = StepConfig(
retry_strategy=create_retry_strategy(retry_config),
step_semantics=StepSemantics.AT_LEAST_ONCE_PER_RETRY,
)
result = context.step(process_data(event["data"]), config=step_config)
return result
import java.util.Map;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
import software.amazon.lambda.durable.StepContext;
import software.amazon.lambda.durable.config.StepConfig;
import software.amazon.lambda.durable.config.StepSemantics;
import software.amazon.lambda.durable.retry.RetryStrategies;
public class ProcessDataExample extends DurableHandler<Map<String, Object>, Map<String, Object>> {
@Override
public Map<String, Object> handleRequest(Map<String, Object> event, DurableContext context) {
StepConfig config = StepConfig.builder()
.retryStrategy(RetryStrategies.exponentialBackoff(
3,
java.time.Duration.ofSeconds(1),
java.time.Duration.ofMinutes(5),
2.0,
software.amazon.lambda.durable.retry.JitterStrategy.FULL))
.semantics(StepSemantics.AT_LEAST_ONCE_PER_RETRY)
.build();
Map<String, Object> result = context.step("process_data", Map.class,
(StepContext ctx) -> Map.of("processed", event.get("data"), "status", "completed"),
config);
return result;
}
}
Pass data between steps¶
Pass data between steps through return values. Do not use shared variables or closure mutations. Steps return cached results on replay, so mutations to outer variables are lost.
wrong way to pass data between steps¶
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
async function registerUser(email: string): Promise<string> {
return `user-${email}`;
}
async function sendFollowUpEmail(userId: string): Promise<void> {
// send email to user
}
// ❌ WRONG: userId mutation is lost on replay after the wait
export const handler = withDurableExecution(
async (event: { email: string }, context: DurableContext) => {
let userId = "";
await context.step("register-user", async () => {
userId = await registerUser(event.email); // ⚠️ Lost on replay!
});
await context.wait("follow-up-delay", { minutes: 10 });
await context.step("send-follow-up-email", async () => {
await sendFollowUpEmail(userId); // userId is "" on replay
});
},
);
from aws_durable_execution_sdk_python import (
DurableContext,
StepContext,
durable_execution,
durable_step,
)
from aws_durable_execution_sdk_python.config import Duration
def register_user(email: str) -> str:
return f"user-{email}"
def send_follow_up_email(user_id: str) -> None:
# send email to user
pass
# ❌ WRONG: user_id mutation is lost on replay after the wait
@durable_execution
def handler(event: dict, context: DurableContext) -> None:
user_id = ""
@durable_step
def do_register(step_context: StepContext) -> None:
nonlocal user_id
user_id = register_user(event["email"]) # ⚠️ Lost on replay!
context.step(do_register())
context.wait(Duration.from_minutes(10), name="follow-up-delay")
@durable_step
def do_send(step_context: StepContext) -> None:
send_follow_up_email(user_id) # user_id is "" on replay
context.step(do_send())
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
public class PassingDataWrongExample extends DurableHandler<Map<String, String>, Void> {
private String registerUser(String email) {
return "user-" + email;
}
private void sendFollowUpEmail(String userId) {
// send email to user
}
@Override
public Void handleRequest(Map<String, String> event, DurableContext context) {
// ❌ WRONG: userId mutation is lost on replay after the wait
AtomicReference<String> userId = new AtomicReference<>("");
context.step("register-user", String.class, ctx -> {
userId.set(registerUser(event.get("email"))); // ⚠️ Lost on replay!
return userId.get();
});
context.wait("follow-up-delay", Duration.ofMinutes(10));
context.step("send-follow-up-email", Void.class, ctx -> {
sendFollowUpEmail(userId.get()); // userId is "" on replay
return null;
});
return null;
}
}
correct way to pass data between steps¶
import { DurableContext, withDurableExecution } from "@aws/durable-execution-sdk-js";
async function registerUser(email: string): Promise<string> {
return `user-${email}`;
}
async function sendFollowUpEmail(userId: string): Promise<void> {
// send email to user
}
// ✅ CORRECT: userId is returned from the step and restored from checkpoint on replay
export const handler = withDurableExecution(
async (event: { email: string }, context: DurableContext) => {
const userId = await context.step("register-user", async () => {
return await registerUser(event.email);
});
await context.wait("follow-up-delay", { minutes: 10 });
await context.step("send-follow-up-email", async () => {
await sendFollowUpEmail(userId); // userId restored from checkpoint
});
},
);
from aws_durable_execution_sdk_python import (
DurableContext,
StepContext,
durable_execution,
durable_step,
)
from aws_durable_execution_sdk_python.config import Duration
def register_user(email: str) -> str:
return f"user-{email}"
def send_follow_up_email(user_id: str) -> None:
# send email to user
pass
# ✅ CORRECT: user_id is returned from the step and restored from checkpoint on replay
@durable_execution
def handler(event: dict, context: DurableContext) -> None:
@durable_step
def do_register(step_context: StepContext) -> str:
return register_user(event["email"])
user_id = context.step(do_register())
context.wait(Duration.from_minutes(10), name="follow-up-delay")
@durable_step
def do_send(step_context: StepContext) -> None:
send_follow_up_email(user_id) # user_id restored from checkpoint
context.step(do_send())
import java.time.Duration;
import java.util.Map;
import software.amazon.lambda.durable.DurableContext;
import software.amazon.lambda.durable.DurableHandler;
public class PassingDataCorrectExample extends DurableHandler<Map<String, String>, Void> {
private String registerUser(String email) {
return "user-" + email;
}
private void sendFollowUpEmail(String userId) {
// send email to user
}
@Override
public Void handleRequest(Map<String, String> event, DurableContext context) {
// ✅ CORRECT: userId is returned from the step and restored from checkpoint on replay
String userId = context.step("register-user", String.class,
ctx -> registerUser(event.get("email")));
context.wait("follow-up-delay", Duration.ofMinutes(10));
context.step("send-follow-up-email", Void.class, ctx -> {
sendFollowUpEmail(userId); // userId restored from checkpoint
return null;
});
return null;
}
}
Nesting steps¶
You cannot nest steps. Do not attempt to invoke another step from inside a step. If you want to group or nest operations, use a child context.
Concurrency¶
Do not run steps concurrently. For concurrent operations, see map and parallel.
To code your own concurrency use a child context to encapsulate each concurrent code path.