Ada lebih banyak contoh AWS SDK yang tersedia di repo Contoh SDK AWS Doc
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Berkomunikasi dengan aliran Amazon Bedrock
Contoh kode berikut menunjukkan cara menggunakan InvokeFlow untuk berkomunikasi dengan aliran Amazon Bedrock yang menyertakan node agen.
Untuk informasi selengkapnya, lihat Berbicara dengan alur Amazon Bedrock.
- SDK untuk Python (Boto3)
-
catatan
Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS
. """ Shows how to run an Amazon Bedrock flow with InvokeFlow and handle muli-turn interaction for a single conversation. For more information, see https://docs.aws.amazon.com/bedrock/latest/userguide/flows-multi-turn-invocation.html. """ import logging import boto3 import botocore import botocore.exceptions logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def invoke_flow(client, flow_id, flow_alias_id, input_data, execution_id): """ Invoke an Amazon Bedrock flow and handle the response stream. Args: client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to invoke. flow_alias_id: The alias ID of the flow. input_data: Input data for the flow. execution_id: Execution ID for continuing a flow. Use the value None on first run. Returns: Dict containing flow_complete status, input_required info, and execution_id """ response = None request_params = None if execution_id is None: # Don't pass execution ID for first run. request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "inputs": [input_data], "enableTrace": True } else: request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "executionId": execution_id, "inputs": [input_data], "enableTrace": True } response = client.invoke_flow(**request_params) if "executionId" not in request_params: execution_id = response['executionId'] input_required = None flow_status = "" # Process the streaming response for event in response['responseStream']: # Check if flow is complete. if 'flowCompletionEvent' in event: flow_status = event['flowCompletionEvent']['completionReason'] # Check if more input us needed from user. elif 'flowMultiTurnInputRequestEvent' in event: input_required = event # Print the model output. elif 'flowOutputEvent' in event: print(event['flowOutputEvent']['content']['document']) # Log trace events. elif 'flowTraceEvent' in event: logger.info("Flow trace: %s", event['flowTraceEvent']) return { "flow_status": flow_status, "input_required": input_required, "execution_id": execution_id } def converse_with_flow(bedrock_agent_client, flow_id, flow_alias_id): """ Run a conversation with the supplied flow. Args: bedrock_agent_client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to run. flow_alias_id: The alias ID of the flow. """ flow_execution_id = None finished = False # Get the intial prompt from the user. user_input = input("Enter input: ") # Use prompt to create input data. flow_input_data = { "content": { "document": user_input }, "nodeName": "FlowInputNode", "nodeOutputName": "document" } try: while not finished: # Invoke the flow until successfully finished. result = invoke_flow( bedrock_agent_client, flow_id, flow_alias_id, flow_input_data, flow_execution_id) status = result['flow_status'] flow_execution_id = result['execution_id'] more_input = result['input_required'] if status == "INPUT_REQUIRED": # The flow needs more information from the user. logger.info("The flow %s requires more input", flow_id) user_input = input( more_input['flowMultiTurnInputRequestEvent']['content']['document'] + ": ") flow_input_data = { "content": { "document": user_input }, "nodeName": more_input['flowMultiTurnInputRequestEvent']['nodeName'], "nodeInputName": "agentInputText" } elif status == "SUCCESS": # The flow completed successfully. finished = True logger.info("The flow %s successfully completed.", flow_id) except botocore.exceptions.ClientError as e: print(f"Client error: {str(e)}") logger.error("Client error: %s", {str(e)}) except Exception as e: print(f"An error occurred: {str(e)}") logger.error("An error occurred: %s", {str(e)}) logger.error("Error type: %s", {type(e)}) def main(): """ Main entry point for the script. """ # Replace these with your actual flow ID and flow alias ID. FLOW_ID = 'YOUR_FLOW_ID' FLOW_ALIAS_ID = 'YOUR_FLOW_ALIAS_ID' logger.info("Starting conversation with FLOW: %s ID: %s", FLOW_ID, FLOW_ALIAS_ID) # Get the Bedrock agent runtime client. session = boto3.Session(profile_name='default') bedrock_agent_client = session.client('bedrock-agent-runtime') # Start the conversation. converse_with_flow(bedrock_agent_client, FLOW_ID, FLOW_ALIAS_ID) logger.info("Conversation with FLOW: %s ID: %s finished", FLOW_ID, FLOW_ALIAS_ID) if __name__ == "__main__": main()
-
Untuk detail API, lihat InvokeFlowdi AWS SDK for Python (Boto3) Referensi API.
-