本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
与代理框架一起使用
您可以将 OpenSearch MCP 服务器直接集成到 Python 代理框架中,从而在更大的工作流程中 OpenSearch 为自治代理提供编程访问权限。
Strands Agents
StrandsAWS_OPENSEARCH_SERVERLESS到 OpenSearch Serverless 集合true时设置为;对于托管域,则将其省略。
import os from strands import Agent from strands.tools.mcp import MCPClient from mcp import stdio_client, StdioServerParameters # For a managed domain: # OPENSEARCH_URL = https://<domain-endpoint>.<region>.es.amazonaws.com # # For an OpenSearch Serverless collection, also set: # AWS_OPENSEARCH_SERVERLESS = true # OPENSEARCH_URL = https://<collection-id>.<region>.aoss.amazonaws.com opensearch_client = MCPClient( lambda: stdio_client( StdioServerParameters( command="uvx", args=["opensearch-mcp-server-py"], env={ "OPENSEARCH_URL": os.environ["OPENSEARCH_URL"], "AWS_REGION": os.environ["AWS_REGION"], "AWS_IAM_ARN": os.environ["AWS_IAM_ARN"], # Set to "true" for OpenSearch Serverless, omit for managed domains "AWS_OPENSEARCH_SERVERLESS": os.environ.get("AWS_OPENSEARCH_SERVERLESS", "false"), }, ) ) ) with opensearch_client: agent = Agent(tools=opensearch_client.list_tools_sync()) response = agent("List all indexes and show the document count for each") print(response)
Strands 使用 Amazon Bedrock 作为其默认模型提供商。确保在您所在地区为 Claude 配置了 AWS 凭证并启用了模型访问权限。有关详细信息,请参阅 Strands Bedrock 提供商文档
LangGraph
LangGraphlangchain-mcp-adapters到由 Amazon Bedrock 支持的 LangGraph ReAct 代理中。与 Strands 一样,true在AWS_OPENSEARCH_SERVERLESS连接到 OpenSearch 无服务器集合时将其设置为。
import asyncio import os from langchain_aws import ChatBedrock from langchain_mcp_adapters.client import MultiServerMCPClient from langgraph.prebuilt import create_react_agent async def main(): async with MultiServerMCPClient( { "opensearch": { "command": "uvx", "args": ["opensearch-mcp-server-py"], "env": { # Managed domain: https://<domain-endpoint>.<region>.es.amazonaws.com # Serverless: https://<collection-id>.<region>.aoss.amazonaws.com "OPENSEARCH_URL": os.environ["OPENSEARCH_URL"], "AWS_REGION": os.environ["AWS_REGION"], "AWS_IAM_ARN": os.environ["AWS_IAM_ARN"], # Set to "true" for OpenSearch Serverless, omit for managed domains "AWS_OPENSEARCH_SERVERLESS": os.environ.get("AWS_OPENSEARCH_SERVERLESS", "false"), }, "transport": "stdio", } } ) as mcp_client: tools = mcp_client.get_tools() model = ChatBedrock( model_id="anthropic.claude-3-5-sonnet-20241022-v2:0", region_name=os.environ["AWS_REGION"], ) agent = create_react_agent(model, tools) result = await agent.ainvoke( {"messages": [{"role": "user", "content": "Check cluster health and list all indexes"}]} ) print(result["messages"][-1].content) asyncio.run(main())
安装所需的软件包:
pip install langchain-aws langchain-mcp-adapters langgraph