将 DynamoDB 用作 LangGraph 代理的检查点存储 - Amazon DynamoDB

将 DynamoDB 用作 LangGraph 代理的检查点存储

LangGraph 是一个使用大语言模型(LLM)构建有状态、多参与者人工智能应用程序的框架。LangGraph 代理需要永久存储来维护对话状态、实现人机协同工作流程、支持容错能力以及提供时空旅行调试功能。DynamoDB 的无服务器架构、个位数毫秒延迟和自动扩展等特性使其成为在 AWS 上实现生产 LangGraph 部署的理想检查点存储。

langgraph-checkpoint-aws 程序包提供了一个用于实现 LangGraph 检查点接口的 DynamoDBSaver 类,使您能够对大型检查点使用可选的 Amazon Simple Storage Service 卸载来在 DynamoDB 中保持代理状态。

主要 功能

状态持久性

在每个步骤之后自动保存代理状态,使代理能够从中断中恢复并从故障中恢复。

基于生存时间的清理

使用 DynamoDB 生存时间来管理存储成本,自动使旧检查点过期。

压缩

(可选)使用 gzip 压缩检查点数据,以降低存储成本并提高吞吐量。

Amazon S3 卸载

自动将大型检查点(大于 350 KB)卸载到 Amazon Simple Storage Service,以便在 DynamoDB 项目大小限制内工作。

同步和异步支持

同步和异步 API,用于在不同的应用程序架构中实现灵活性。

先决条件

  • Python 3.10 或更高版本

  • 一个 AWS 账户,具有创建 DynamoDB 表(以及可选的 Amazon S3 存储桶)的权限

  • 配置了 AWS 凭证(有关凭证设置选项,请参阅 AWS 文档)

重要

本指南创建的 AWS 资源可能会产生费用。默认情况下,DynamoDB 使用按请求支付计费,如果您启用大型检查点卸载功能,则会收取 Amazon S3 费用。完成后,按照清理部分以删除资源。

安装

从 PyPI 安装检查点程序包:

pip install langgraph-checkpoint-aws

基本用法

以下示例演示如何将 DynamoDB 配置为 LangGraph 代理的检查点存储:

from langgraph.graph import StateGraph from langgraph_checkpoint_aws import DynamoDBSaver from typing import TypedDict # Define your state schema class State(TypedDict): input: str result: str # Initialize the DynamoDB checkpoint saver checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", region_name="us-east-1" ) # Build your LangGraph workflow builder = StateGraph(State) builder.add_node("process", lambda state: {"result": "processed"}) builder.set_entry_point("process") builder.set_finish_point("process") # Compile the graph with the DynamoDB checkpointer graph = builder.compile(checkpointer=checkpointer) # Invoke the graph with a thread ID to enable state persistence config = {"configurable": {"thread_id": "session-123"}} result = graph.invoke({"input": "data"}, config)

配置中的 thread_id 充当 DynamoDB 中的分区键,支持您维护单独的对话线程并检索任何线程的历史状态。

生产配置

对于生产部署,您可以启用生存时间、压缩和 Amazon S3 卸载。也可以使用 endpoint_url 参数来指向本地 DynamoDB 实例以进行测试:

import boto3 from botocore.config import Config from langgraph_checkpoint_aws import DynamoDBSaver # Production configuration session = boto3.Session( profile_name="production", region_name="us-east-1" ) checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", session=session, ttl_seconds=86400 * 7, # Expire checkpoints after 7 days enable_checkpoint_compression=True, # Enable gzip compression boto_config=Config( retries={"mode": "adaptive", "max_attempts": 6}, max_pool_connections=50 ), s3_offload_config={ "bucket_name": "my-checkpoint-bucket" } ) # Local testing with DynamoDB Local local_checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", region_name="us-east-1", endpoint_url="http://localhost:8000" )

DynamoDB 表配置

检查点保护程序需要一个具有复合主键的 DynamoDB 表。使用以下 AWS CloudFormation 模板创建表:

AWSTemplateFormatVersion: '2010-09-09' Description: 'DynamoDB table for LangGraph checkpoint storage' Parameters: TableName: Type: String Default: langgraph-checkpoints Resources: CheckpointTable: Type: AWS::DynamoDB::Table DeletionPolicy: Retain UpdateReplacePolicy: Retain Properties: TableName: !Ref TableName BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: PK AttributeType: S - AttributeName: SK AttributeType: S KeySchema: - AttributeName: PK KeyType: HASH - AttributeName: SK KeyType: RANGE TimeToLiveSpecification: AttributeName: ttl Enabled: true PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true SSESpecification: SSEEnabled: true

使用 AWS CLI 部署模板:

aws cloudformation deploy \ --template-file template.yaml \ --stack-name langgraph-checkpoint \ --parameter-overrides TableName=langgraph-checkpoints

所需的 IAM 权限

以下 IAM 策略提供了 DynamoDB 检查点保护程序所需的最小权限。将 111122223333 替换为您的 AWS 账户 ID,并更新区域以匹配您的环境。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query", "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Resource": "arn:aws:dynamodb:us-east-1:111122223333:table/langgraph-checkpoints" } ] }

如果启用 Amazon S3 卸载,请在策略中添加以下语句:

{ "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject", "s3:PutObjectTagging" ], "Resource": "arn:aws:s3:::my-checkpoint-bucket/*" }, { "Effect": "Allow", "Action": [ "s3:GetBucketLifecycleConfiguration", "s3:PutBucketLifecycleConfiguration" ], "Resource": "arn:aws:s3:::my-checkpoint-bucket" }

异步使用

对于异步应用程序,请使用检查点保护程序提供的异步方法:

import asyncio from langgraph.graph import StateGraph from langgraph_checkpoint_aws import DynamoDBSaver from typing import TypedDict class State(TypedDict): input: str result: str async def main(): checkpointer = DynamoDBSaver( table_name="langgraph-checkpoints", region_name="us-east-1" ) builder = StateGraph(State) builder.add_node("process", lambda state: {"result": "processed"}) builder.set_entry_point("process") builder.set_finish_point("process") graph = builder.compile(checkpointer=checkpointer) config = {"configurable": {"thread_id": "async-session-123"}} result = await graph.ainvoke({"input": "data"}, config) return result asyncio.run(main())

清理

为避免持续产生费用,请删除您创建的资源:

# Delete the DynamoDB table aws dynamodb delete-table --table-name langgraph-checkpoints # Delete the CloudFormation stack (if you used the template above) aws cloudformation delete-stack --stack-name langgraph-checkpoint # If you created an S3 bucket for large checkpoint offloading, empty and delete it aws s3 rm s3://my-checkpoint-bucket --recursive aws s3 rb s3://my-checkpoint-bucket

错误处理

常见错误情景:

  • 找不到表:验证 table_nameregion_name 与您的 DynamoDB 表匹配。

  • 节流:如果您看到 ProvisionedThroughputExceededException,请考虑切换到按需计费模式或增加预置容量。

  • 超出项目大小:如果检查点超过 350 KB,请启用 Amazon S3 卸载(请参阅生产配置)。

  • 凭证错误:验证您的 AWS 凭证有效且具有所需的权限

其他资源