使用适用于 Python 的 SDK (Boto3) 的亚马逊 Bedrock Agents 运行时示例 - AWS SDK 代码示例

文档 AWS SDK 示例 GitHub 存储库中还有更多 S AWS DK 示例

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用适用于 Python 的 SDK (Boto3) 的亚马逊 Bedrock Agents 运行时示例

以下代码示例向您展示了如何使用 适用于 Python (Boto3) 的 AWS SDK 与 Amazon Bedrock Agents 运行时一起使用来执行操作和实现常见场景。

基础知识是向您展示如何在服务中执行基本操作的代码示例。

操作是大型程序的代码摘录,必须在上下文中运行。您可以通过操作了解如何调用单个服务函数,还可以通过函数相关场景的上下文查看操作。

场景是向您演示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。

每个示例都包含一个指向完整源代码的链接,您可以从中找到有关如何在上下文中设置和运行代码的说明。

基本功能

以下代码示例显示了 InvokeFlow 如何使用与包含代理节点的 Amazon Bedrock 流程进行对话。

有关更多信息,请参阅使用 Amazon Bedrock 流程进行交谈

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

""" Shows how to run an Amazon Bedrock flow with InvokeFlow and handle muli-turn interaction for a single conversation. For more information, see https://docs.aws.amazon.com/bedrock/latest/userguide/flows-multi-turn-invocation.html. """ import logging import boto3 import botocore import botocore.exceptions logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def invoke_flow(client, flow_id, flow_alias_id, input_data, execution_id): """ Invoke an Amazon Bedrock flow and handle the response stream. Args: client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to invoke. flow_alias_id: The alias ID of the flow. input_data: Input data for the flow. execution_id: Execution ID for continuing a flow. Use the value None on first run. Returns: Dict containing flow_complete status, input_required info, and execution_id """ response = None request_params = None if execution_id is None: # Don't pass execution ID for first run. request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "inputs": [input_data], "enableTrace": True } else: request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "executionId": execution_id, "inputs": [input_data], "enableTrace": True } response = client.invoke_flow(**request_params) if "executionId" not in request_params: execution_id = response['executionId'] input_required = None flow_status = "" # Process the streaming response for event in response['responseStream']: # Check if flow is complete. if 'flowCompletionEvent' in event: flow_status = event['flowCompletionEvent']['completionReason'] # Check if more input us needed from user. elif 'flowMultiTurnInputRequestEvent' in event: input_required = event # Print the model output. elif 'flowOutputEvent' in event: print(event['flowOutputEvent']['content']['document']) # Log trace events. elif 'flowTraceEvent' in event: logger.info("Flow trace: %s", event['flowTraceEvent']) return { "flow_status": flow_status, "input_required": input_required, "execution_id": execution_id } def converse_with_flow(bedrock_agent_client, flow_id, flow_alias_id): """ Run a conversation with the supplied flow. Args: bedrock_agent_client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to run. flow_alias_id: The alias ID of the flow. """ flow_execution_id = None finished = False # Get the intial prompt from the user. user_input = input("Enter input: ") # Use prompt to create input data. flow_input_data = { "content": { "document": user_input }, "nodeName": "FlowInputNode", "nodeOutputName": "document" } try: while not finished: # Invoke the flow until successfully finished. result = invoke_flow( bedrock_agent_client, flow_id, flow_alias_id, flow_input_data, flow_execution_id) status = result['flow_status'] flow_execution_id = result['execution_id'] more_input = result['input_required'] if status == "INPUT_REQUIRED": # The flow needs more information from the user. logger.info("The flow %s requires more input", flow_id) user_input = input( more_input['flowMultiTurnInputRequestEvent']['content']['document'] + ": ") flow_input_data = { "content": { "document": user_input }, "nodeName": more_input['flowMultiTurnInputRequestEvent']['nodeName'], "nodeInputName": "agentInputText" } elif status == "SUCCESS": # The flow completed successfully. finished = True logger.info("The flow %s successfully completed.", flow_id) except botocore.exceptions.ClientError as e: print(f"Client error: {str(e)}") logger.error("Client error: %s", {str(e)}) except Exception as e: print(f"An error occurred: {str(e)}") logger.error("An error occurred: %s", {str(e)}) logger.error("Error type: %s", {type(e)}) def main(): """ Main entry point for the script. """ # Replace these with your actual flow ID and flow alias ID. FLOW_ID = 'YOUR_FLOW_ID' FLOW_ALIAS_ID = 'YOUR_FLOW_ALIAS_ID' logger.info("Starting conversation with FLOW: %s ID: %s", FLOW_ID, FLOW_ALIAS_ID) # Get the Bedrock agent runtime client. session = boto3.Session(profile_name='default') bedrock_agent_client = session.client('bedrock-agent-runtime') # Start the conversation. converse_with_flow(bedrock_agent_client, FLOW_ID, FLOW_ALIAS_ID) logger.info("Conversation with FLOW: %s ID: %s finished", FLOW_ID, FLOW_ALIAS_ID) if __name__ == "__main__": main()
  • 有关 API 的详细信息,请参阅适用InvokeFlowPython 的AWS SDK (Boto3) API 参考

操作

以下代码示例演示了如何使用 InvokeAgent

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

调用代理。

def invoke_agent(self, agent_id, agent_alias_id, session_id, prompt): """ Sends a prompt for the agent to process and respond to. :param agent_id: The unique identifier of the agent to use. :param agent_alias_id: The alias of the agent to use. :param session_id: The unique identifier of the session. Use the same value across requests to continue the same conversation. :param prompt: The prompt that you want Claude to complete. :return: Inference response from the model. """ try: # Note: The execution time depends on the foundation model, complexity of the agent, # and the length of the prompt. In some cases, it can take up to a minute or more to # generate a response. response = self.agents_runtime_client.invoke_agent( agentId=agent_id, agentAliasId=agent_alias_id, sessionId=session_id, inputText=prompt, ) completion = "" for event in response.get("completion"): chunk = event["chunk"] completion = completion + chunk["bytes"].decode() except ClientError as e: logger.error(f"Couldn't invoke agent. {e}") raise return completion
  • 有关 API 的详细信息,请参阅适用InvokeAgentPython 的AWS SDK (Boto3) API 参考

以下代码示例演示了如何使用 InvokeFlow

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

调用流程。

def invoke_flow(self, flow_id, flow_alias_id, input_data, execution_id): """ Invoke an Amazon Bedrock flow and handle the response stream. Args: param flow_id: The ID of the flow to invoke. param flow_alias_id: The alias ID of the flow. param input_data: Input data for the flow. param execution_id: Execution ID for continuing a flow. Use the value None on first run. Return: Response from the flow. """ try: request_params = None if execution_id is None: # Don't pass execution ID for first run. request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "inputs": input_data, "enableTrace": True } else: request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "executionId": execution_id, "inputs": input_data, "enableTrace": True } response = self.agents_runtime_client.invoke_flow(**request_params) if "executionId" not in request_params: execution_id = response['executionId'] result = "" # Get the streaming response for event in response['responseStream']: result = result + str(event) + '\n' print(result) except ClientError as e: logger.error("Couldn't invoke flow %s.", {e}) raise return result
  • 有关 API 的详细信息,请参阅适用InvokeFlowPython 的AWS SDK (Boto3) API 参考

场景

以下代码示例展示了如何:

  • 为流程创建执行角色。

  • 创建流。

  • 部署完全配置的流程。

  • 使用用户提供的提示调用流程。

  • 删除所有已创建的资源。

适用于 Python 的 SDK(Boto3)
注意

还有更多相关信息 GitHub。在 AWS 代码示例存储库中查找完整示例,了解如何进行设置和运行。

根据用户指定的流派和歌曲数量生成音乐播放列表。

from datetime import datetime import logging import boto3 from botocore.exceptions import ClientError from roles import create_flow_role, delete_flow_role, update_role_policy from flow import create_flow, prepare_flow, delete_flow from run_flow import run_playlist_flow from flow_version import create_flow_version, delete_flow_version from flow_alias import create_flow_alias, delete_flow_alias logging.basicConfig( level=logging.INFO ) logger = logging.getLogger(__name__) def create_input_node(name): """ Creates an input node configuration for an Amazon Bedrock flow. The input node serves as the entry point for the flow and defines the initial document structure that will be passed to subsequent nodes. Args: name (str): The name of the input node. Returns: dict: The input node configuration. """ return { "type": "Input", "name": name, "outputs": [ { "name": "document", "type": "Object" } ] } def create_prompt_node(name, model_id): """ Creates a prompt node configuration for a Bedrock flow that generates music playlists. The prompt node defines an inline prompt template that creates a music playlist based on a specified genre and number of songs. The prompt uses two variables that are mapped from the input JSON object: - {{genre}}: The genre of music to create a playlist for - {{number}}: The number of songs to include in the playlist Args: name (str): The name of the prompt node. model_id (str): The identifier of the foundation model to use for the prompt. Returns: dict: The prompt node. """ return { "type": "Prompt", "name": name, "configuration": { "prompt": { "sourceConfiguration": { "inline": { "modelId": model_id, "templateType": "TEXT", "inferenceConfiguration": { "text": { "temperature": 0.8 } }, "templateConfiguration": { "text": { "text": "Make me a {{genre}} playlist consisting of the following number of songs: {{number}}." } } } } } }, "inputs": [ { "name": "genre", "type": "String", "expression": "$.data.genre" }, { "name": "number", "type": "Number", "expression": "$.data.number" } ], "outputs": [ { "name": "modelCompletion", "type": "String" } ] } def create_output_node(name): """ Creates an output node configuration for a Bedrock flow. The output node validates that the output from the last node is a string and returns it unmodified. The input name must be "document". Args: name (str): The name of the output node. Returns: dict: The output node configuration containing the output node: """ return { "type": "Output", "name": name, "inputs": [ { "name": "document", "type": "String", "expression": "$.data" } ] } def create_playlist_flow(client, flow_name, flow_description, role_arn, prompt_model_id): """ Creates the playlist generator flow. Args: client: bedrock agent boto3 client. role_arn (str): Name for the new IAM role. prompt_model_id (str): The id of the model to use in the prompt node. Returns: dict: The response from the create_flow operation. """ input_node = create_input_node("FlowInput") prompt_node = create_prompt_node("MakePlaylist", prompt_model_id) output_node = create_output_node("FlowOutput") # Create connections between the nodes connections = [] # First, create connections between the output of the flow # input node and each input of the prompt node. for prompt_node_input in prompt_node["inputs"]: connections.append( { "name": "_".join([input_node["name"], prompt_node["name"], prompt_node_input["name"]]), "source": input_node["name"], "target": prompt_node["name"], "type": "Data", "configuration": { "data": { "sourceOutput": input_node["outputs"][0]["name"], "targetInput": prompt_node_input["name"] } } } ) # Then, create a connection between the output of the prompt node and the input of the flow output node connections.append( { "name": "_".join([prompt_node["name"], output_node["name"]]), "source": prompt_node["name"], "target": output_node["name"], "type": "Data", "configuration": { "data": { "sourceOutput": prompt_node["outputs"][0]["name"], "targetInput": output_node["inputs"][0]["name"] } } } ) flow_def = { "nodes": [input_node, prompt_node, output_node], "connections": connections } # Create the flow. response = create_flow( client, flow_name, flow_description, role_arn, flow_def) return response def get_model_arn(client, model_id): """ Gets the Amazon Resource Name (ARN) for a model. Args: client (str): Amazon Bedrock boto3 client. model_id (str): The id of the model. Returns: str: The ARN of the model. """ try: # Call GetFoundationModelDetails operation response = client.get_foundation_model(modelIdentifier=model_id) # Extract model ARN from the response model_arn = response['modelDetails']['modelArn'] return model_arn except ClientError as e: logger.exception("Client error getting model ARN: %s", {str(e)}) raise except Exception as e: logger.exception("Unexpected error getting model ARN: %s", {str(e)}) raise def prepare_flow_version_and_alias(bedrock_agent_client, flow_id): """ Prepares the flow and then creates a flow version and flow alias. Args: bedrock_agent_client: Amazon Bedrock Agent boto3 client. flowd_id (str): The ID of the flow that you want to prepare. Returns: The flow_version and flow_alias. """ status = prepare_flow(bedrock_agent_client, flow_id) flow_version = None flow_alias = None if status == 'Prepared': # Create the flow version and alias. flow_version = create_flow_version(bedrock_agent_client, flow_id, f"flow version for flow {flow_id}.") flow_alias = create_flow_alias(bedrock_agent_client, flow_id, flow_version, "latest", f"Alias for flow {flow_id}, version {flow_version}") return flow_version, flow_alias def delete_role_resources(bedrock_agent_client, iam_client, role_name, flow_id, flow_version, flow_alias): """ Deletes the flow, flow alias, flow version, and IAM roles. Args: bedrock_agent_client: Amazon Bedrock Agent boto3 client. iam_client: Amazon IAM boto3 client. role_name (str): The name of the IAM role. flow_id (str): The id of the flow. flow_version (str): The version of the flow. flow_alias (str): The alias of the flow. """ if flow_id is not None: if flow_alias is not None: delete_flow_alias(bedrock_agent_client, flow_id, flow_alias) if flow_version is not None: delete_flow_version(bedrock_agent_client, flow_id, flow_version) delete_flow(bedrock_agent_client, flow_id) if role_name is not None: delete_flow_role(iam_client, role_name) def main(): """ Creates, runs, and optionally deletes a Bedrock flow for generating music playlists. Note: Requires valid AWS credentials in the default profile """ delete_choice = "y" try: # Get various boto3 clients. session = boto3.Session(profile_name='default') bedrock_agent_runtime_client = session.client('bedrock-agent-runtime') bedrock_agent_client = session.client('bedrock-agent') bedrock_client = session.client('bedrock') iam_client = session.client('iam') role_name = None flow_id = None flow_version = None flow_alias = None #Change the model as needed. prompt_model_id = "amazon.nova-pro-v1:0" # Base the flow name on the current date and time current_time = datetime.now() timestamp = current_time.strftime("%Y-%m-%d-%H-%M-%S") flow_name = f"FlowPlayList_{timestamp}" flow_description = "A flow to generate a music playlist." # Create a role for the flow. role_name = f"BedrockFlowRole-{flow_name}" role = create_flow_role(iam_client, role_name) role_arn = role['Arn'] # Create the flow. response = create_playlist_flow( bedrock_agent_client, flow_name, flow_description, role_arn, prompt_model_id) flow_id = response.get('id') if flow_id: # Update accessible resources in the role. model_arn = get_model_arn(bedrock_client, prompt_model_id) update_role_policy(iam_client, role_name, [ response.get('arn'), model_arn]) # Prepare the flow and flow version. flow_version, flow_alias = prepare_flow_version_and_alias( bedrock_agent_client, flow_id) # Run the flow. if flow_version and flow_alias: run_playlist_flow(bedrock_agent_runtime_client, flow_id, flow_alias) delete_choice = input("Delete flow? y or n : ").lower() else: print("Couldn't run. Deleting flow and role.") delete_flow(bedrock_agent_client, flow_id) delete_flow_role(iam_client, role_name) else: print("Couldn't create flow.") except Exception as e: print(f"Fatal error: {str(e)}") finally: if delete_choice == 'y': delete_role_resources(bedrock_agent_client, iam_client, role_name, flow_id, flow_version, flow_alias) else: print("Flow not deleted. ") print(f"\tFlow ID: {flow_id}") print(f"\tFlow version: {flow_version}") print(f"\tFlow alias: {flow_alias}") print(f"\tRole ARN: {role_arn}") print("Done!") if __name__ == "__main__": main() def invoke_flow(client, flow_id, flow_alias_id, input_data): """ Invoke an Amazon Bedrock flow and handle the response stream. Args: client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to invoke. flow_alias_id: The alias ID of the flow. input_data: Input data for the flow. Returns: Dict containing flow status and flow output. """ response = None request_params = None request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "inputs": [input_data], "enableTrace": True } response = client.invoke_flow(**request_params) flow_status = "" output= "" # Process the streaming response for event in response['responseStream']: # Check if flow is complete. if 'flowCompletionEvent' in event: flow_status = event['flowCompletionEvent']['completionReason'] # Save the model output. elif 'flowOutputEvent' in event: output = event['flowOutputEvent']['content']['document'] logger.info("Output : %s", output) # Log trace events. elif 'flowTraceEvent' in event: logger.info("Flow trace: %s", event['flowTraceEvent']) return { "flow_status": flow_status, "output": output } def run_playlist_flow(bedrock_agent_client, flow_id, flow_alias_id): """ Runs the playlist generator flow. Args: bedrock_agent_client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to run. flow_alias_id: The alias ID of the flow. """ print ("Welcome to the playlist generator flow.") # Get the initial prompt from the user. genre = input("Enter genre: ") number_of_songs = int(input("Enter number of songs: ")) # Use prompt to create input data for the input node. flow_input_data = { "content": { "document": { "genre" : genre, "number" : number_of_songs } }, "nodeName": "FlowInput", "nodeOutputName": "document" } try: result = invoke_flow( bedrock_agent_client, flow_id, flow_alias_id, flow_input_data) status = result['flow_status'] if status == "SUCCESS": # The flow completed successfully. logger.info("The flow %s successfully completed.", flow_id) print(result['output']) else: logger.warning("Flow status: %s",status) except ClientError as e: print(f"Client error: {str(e)}") logger.error("Client error: %s", {str(e)}) raise except Exception as e: logger.error("An error occurred: %s", {str(e)}) logger.error("Error type: %s", {type(e)}) raise def create_flow_role(client, role_name): """ Creates an IAM role for Amazon Bedrock with permissions to run a flow. Args: role_name (str): Name for the new IAM role. Returns: str: The role Amazon Resource Name. """ # Trust relationship policy - allows Amazon Bedrock service to assume this role. trust_policy = { "Version": "2012-10-17", "Statement": [{ "Effect": "Allow", "Principal": { "Service": "bedrock.amazonaws.com" }, "Action": "sts:AssumeRole" }] } # Basic inline policy for for running a flow. resources = "*" bedrock_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "bedrock:InvokeModel", "bedrock:Retrieve", "bedrock:RetrieveAndGenerate" ], # Using * as placeholder - Later you update with specific ARNs. "Resource": resources } ] } try: # Create the IAM role with trust policy logging.info("Creating role: %s",role_name) role = client.create_role( RoleName=role_name, AssumeRolePolicyDocument=json.dumps(trust_policy), Description="Role for Amazon Bedrock operations" ) # Attach inline policy to the role print("Attaching inline policy") client.put_role_policy( RoleName=role_name, PolicyName=f"{role_name}-policy", PolicyDocument=json.dumps(bedrock_policy) ) logging.info("Create Role ARN: %s", role['Role']['Arn']) return role['Role'] except ClientError as e: logging.warning("Error creating role: %s", str(e)) raise except Exception as e: logging.warning("Unexpected error: %s", str(e)) raise def update_role_policy(client, role_name, resource_arns): """ Updates an IAM role's inline policy with specific resource ARNs. Args: role_name (str): Name of the existing role. resource_arns (list): List of resource ARNs to allow access to. """ updated_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "bedrock:GetFlow", "bedrock:InvokeModel", "bedrock:Retrieve", "bedrock:RetrieveAndGenerate" ], "Resource": resource_arns } ] } try: client.put_role_policy( RoleName=role_name, PolicyName=f"{role_name}-policy", PolicyDocument=json.dumps(updated_policy) ) logging.info("Updated policy for role: %s",role_name) except ClientError as e: logging.warning("Error updating role policy: %s", str(e)) raise def delete_flow_role(client, role_name): """ Deletes an IAM role. Args: role_name (str): Name of the role to delete. """ try: # Detach and delete inline policies policies = client.list_role_policies(RoleName=role_name)['PolicyNames'] for policy_name in policies: client.delete_role_policy(RoleName=role_name, PolicyName=policy_name) # Delete the role client.delete_role(RoleName=role_name) logging.info("Deleted role: %s", role_name) except ClientError as e: logging.info("Error Deleting role: %s", str(e)) raise

以下代码示例演示了如何使用 Amazon Bedrock 和 Step Functions 构建和编排生成式人工智能应用程序。

适用于 Python 的 SDK(Boto3)

Amazon Bedrock 无服务器提示串接场景演示了如何使用 AWS Step FunctionsAmazon Bedrockhttps://docs.aws.amazon.com/bedrock/latest/userguide/agents.html 来构建和编排复杂、无服务器且高度可扩展的生成式人工智能应用程序。该场景包含以下工作示例:

  • 为文学博客撰写一篇指定小说的分析。此示例说明了一个简单的、按顺序排列的提示链。

  • 生成一篇有关指定主题的短篇小说。此示例说明了人工智能如何以迭代方式处理其先前生成的项目列表。

  • 针对前往指定目的地的周末度假制定一份行程计划。此示例说明了如何并行处理多个不同的提示。

  • 向担任电影制片人的人类用户推销电影创意。此示例说明了如何使用不同的推理参数对同一个提示进行并行处理,如何回溯到链中的上一个步骤,以及如何将人工输入作为工作流程的一部分。

  • 根据用户手头的食材制定一个膳食计划。此示例说明了提示链如何整合两个不同的人工智能对话,通过两个人工智能角色相互进行辩论来改善最终结果。

  • 查找并总结当今最热门的 GitHub 存储库。此示例说明如何链接多个与外部 APIs交互的 AI 代理。

有关完整的源代码以及设置和运行说明,请参阅上的完整项目GitHub

本示例中使用的服务
  • Amazon Bedrock

  • Amazon Bedrock 运行时系统

  • Amazon Bedrock 代理

  • Amazon Bedrock 代理运行时

  • Step Functions