将 DynamoDB 用作 LangGraph 代理的检查点存储
LangGraph
langgraph-checkpoint-aws 程序包提供了一个用于实现 LangGraph 检查点接口的 DynamoDBSaver 类,使您能够对大型检查点使用可选的 Amazon Simple Storage Service 卸载来在 DynamoDB 中保持代理状态。
主要 功能
- 状态持久性
-
在每个步骤之后自动保存代理状态,使代理能够从中断中恢复并从故障中恢复。
- 基于生存时间的清理
-
使用 DynamoDB 生存时间来管理存储成本,自动使旧检查点过期。
- 压缩
-
(可选)使用 gzip 压缩检查点数据,以降低存储成本并提高吞吐量。
- Amazon S3 卸载
-
自动将大型检查点(大于 350 KB)卸载到 Amazon Simple Storage Service,以便在 DynamoDB 项目大小限制内工作。
- 同步和异步支持
-
同步和异步 API,用于在不同的应用程序架构中实现灵活性。
先决条件
-
Python 3.10 或更高版本
-
一个 AWS 账户,具有创建 DynamoDB 表(以及可选的 Amazon S3 存储桶)的权限
-
配置了 AWS 凭证(有关凭证设置选项,请参阅 AWS 文档)
重要
本指南创建的 AWS 资源可能会产生费用。默认情况下,DynamoDB 使用按请求支付计费,如果您启用大型检查点卸载功能,则会收取 Amazon S3 费用。完成后,按照清理部分以删除资源。
安装
从 PyPI 安装检查点程序包:
pip install langgraph-checkpoint-aws
基本用法
以下示例演示如何将 DynamoDB 配置为 LangGraph 代理的检查点存储:
from langgraph.graph import StateGraph from langgraph_checkpoint_aws import DynamoDBSaver from typing import TypedDict # Define your state schema class State(TypedDict): input: str result: str # Initialize the DynamoDB checkpoint saver checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", region_name="us-east-1" ) # Build your LangGraph workflow builder = StateGraph(State) builder.add_node("process", lambda state: {"result": "processed"}) builder.set_entry_point("process") builder.set_finish_point("process") # Compile the graph with the DynamoDB checkpointer graph = builder.compile(checkpointer=checkpointer) # Invoke the graph with a thread ID to enable state persistence config = {"configurable": {"thread_id": "session-123"}} result = graph.invoke({"input": "data"}, config)
配置中的 thread_id 充当 DynamoDB 中的分区键,支持您维护单独的对话线程并检索任何线程的历史状态。
生产配置
对于生产部署,您可以启用生存时间、压缩和 Amazon S3 卸载。也可以使用 endpoint_url 参数来指向本地 DynamoDB 实例以进行测试:
import boto3 from botocore.config import Config from langgraph_checkpoint_aws import DynamoDBSaver # Production configuration session = boto3.Session( profile_name="production", region_name="us-east-1" ) checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", session=session, ttl_seconds=86400 * 7, # Expire checkpoints after 7 days enable_checkpoint_compression=True, # Enable gzip compression boto_config=Config( retries={"mode": "adaptive", "max_attempts": 6}, max_pool_connections=50 ), s3_offload_config={ "bucket_name": "my-checkpoint-bucket" } ) # Local testing with DynamoDB Local local_checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", region_name="us-east-1", endpoint_url="http://localhost:8000" )
DynamoDB 表配置
检查点保护程序需要一个具有复合主键的 DynamoDB 表。使用以下 AWS CloudFormation 模板创建表:
AWSTemplateFormatVersion: '2010-09-09' Description: 'DynamoDB table for LangGraph checkpoint storage' Parameters: TableName: Type: String Default: langgraph-checkpoints Resources: CheckpointTable: Type: AWS::DynamoDB::Table DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: TableName: !Ref TableName BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: PK AttributeType: S - AttributeName: SK AttributeType: S KeySchema: - AttributeName: PK KeyType: HASH - AttributeName: SK KeyType: RANGE TimeToLiveSpecification: AttributeName: ttl Enabled: true PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true SSESpecification: SSEEnabled: true
使用 AWS CLI 部署模板:
aws cloudformation deploy \ --template-file template.yaml \ --stack-name langgraph-checkpoint \ --parameter-overrides TableName=langgraph-checkpoints
所需的 IAM 权限
以下 IAM 策略提供了 DynamoDB 检查点保护程序所需的最小权限。将 111122223333 替换为您的 AWS 账户 ID,并更新区域以匹配您的环境。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query", "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/langgraph-checkpoints" } ] }
如果启用 Amazon S3 卸载,请在策略中添加以下语句:
{ "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:PutObjectTagging" ], "Resource": "arn:aws:s3:::my-checkpoint-bucket/*" }, { "Effect": "Allow", "Action": [ "s3:GetBucketLifecycleConfiguration", "s3:PutBucketLifecycleConfiguration" ], "Resource": "arn:aws:s3:::my-checkpoint-bucket" }
异步使用
对于异步应用程序,请使用检查点保护程序提供的异步方法:
import asyncio from langgraph.graph import StateGraph from langgraph_checkpoint_aws import DynamoDBSaver from typing import TypedDict class State(TypedDict): input: str result: str async def main(): checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", region_name="us-east-1" ) builder = StateGraph(State) builder.add_node("process", lambda state: {"result": "processed"}) builder.set_entry_point("process") builder.set_finish_point("process") graph = builder.compile(checkpointer=checkpointer) config = {"configurable": {"thread_id": "async-session-123"}} result = await graph.ainvoke({"input": "data"}, config) return result asyncio.run(main())
清理
为避免持续产生费用,请删除您创建的资源:
# Delete the DynamoDB table aws dynamodb delete-table --table-name langgraph-checkpoints # Delete the CloudFormation stack (if you used the template above) aws cloudformation delete-stack --stack-name langgraph-checkpoint # If you created an S3 bucket for large checkpoint offloading, empty and delete it aws s3 rm s3://my-checkpoint-bucket --recursive aws s3 rb s3://my-checkpoint-bucket
错误处理
常见错误情景: