Pilih preferensi cookie Anda

Kami menggunakan cookie penting serta alat serupa yang diperlukan untuk menyediakan situs dan layanan. Kami menggunakan cookie performa untuk mengumpulkan statistik anonim sehingga kami dapat memahami cara pelanggan menggunakan situs dan melakukan perbaikan. Cookie penting tidak dapat dinonaktifkan, tetapi Anda dapat mengklik “Kustom” atau “Tolak” untuk menolak cookie performa.

Jika Anda setuju, AWS dan pihak ketiga yang disetujui juga akan menggunakan cookie untuk menyediakan fitur situs yang berguna, mengingat preferensi Anda, dan menampilkan konten yang relevan, termasuk iklan yang relevan. Untuk menerima atau menolak semua cookie yang tidak penting, klik “Terima” atau “Tolak”. Untuk membuat pilihan yang lebih detail, klik “Kustomisasi”.

Berkomunikasi dengan aliran Amazon Bedrock - AWS Contoh Kode SDK

Ada lebih banyak contoh AWS SDK yang tersedia di repo Contoh SDK AWS Doc. GitHub

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Ada lebih banyak contoh AWS SDK yang tersedia di repo Contoh SDK AWS Doc. GitHub

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Berkomunikasi dengan aliran Amazon Bedrock

Contoh kode berikut menunjukkan cara menggunakan InvokeFlow untuk berkomunikasi dengan aliran Amazon Bedrock yang menyertakan node agen.

Untuk informasi selengkapnya, lihat Berbicara dengan alur Amazon Bedrock.

Python
SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

""" Shows how to run an Amazon Bedrock flow with InvokeFlow and handle muli-turn interaction for a single conversation. For more information, see https://docs.aws.amazon.com/bedrock/latest/userguide/flows-multi-turn-invocation.html. """ import logging import boto3 import botocore import botocore.exceptions logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def invoke_flow(client, flow_id, flow_alias_id, input_data, execution_id): """ Invoke an Amazon Bedrock flow and handle the response stream. Args: client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to invoke. flow_alias_id: The alias ID of the flow. input_data: Input data for the flow. execution_id: Execution ID for continuing a flow. Use the value None on first run. Returns: Dict containing flow_complete status, input_required info, and execution_id """ response = None request_params = None if execution_id is None: # Don't pass execution ID for first run. request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "inputs": [input_data], "enableTrace": True } else: request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "executionId": execution_id, "inputs": [input_data], "enableTrace": True } response = client.invoke_flow(**request_params) if "executionId" not in request_params: execution_id = response['executionId'] input_required = None flow_status = "" # Process the streaming response for event in response['responseStream']: # Check if flow is complete. if 'flowCompletionEvent' in event: flow_status = event['flowCompletionEvent']['completionReason'] # Check if more input us needed from user. elif 'flowMultiTurnInputRequestEvent' in event: input_required = event # Print the model output. elif 'flowOutputEvent' in event: print(event['flowOutputEvent']['content']['document']) # Log trace events. elif 'flowTraceEvent' in event: logger.info("Flow trace: %s", event['flowTraceEvent']) return { "flow_status": flow_status, "input_required": input_required, "execution_id": execution_id } def converse_with_flow(bedrock_agent_client, flow_id, flow_alias_id): """ Run a conversation with the supplied flow. Args: bedrock_agent_client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to run. flow_alias_id: The alias ID of the flow. """ flow_execution_id = None finished = False # Get the intial prompt from the user. user_input = input("Enter input: ") # Use prompt to create input data. flow_input_data = { "content": { "document": user_input }, "nodeName": "FlowInputNode", "nodeOutputName": "document" } try: while not finished: # Invoke the flow until successfully finished. result = invoke_flow( bedrock_agent_client, flow_id, flow_alias_id, flow_input_data, flow_execution_id) status = result['flow_status'] flow_execution_id = result['execution_id'] more_input = result['input_required'] if status == "INPUT_REQUIRED": # The flow needs more information from the user. logger.info("The flow %s requires more input", flow_id) user_input = input( more_input['flowMultiTurnInputRequestEvent']['content']['document'] + ": ") flow_input_data = { "content": { "document": user_input }, "nodeName": more_input['flowMultiTurnInputRequestEvent']['nodeName'], "nodeInputName": "agentInputText" } elif status == "SUCCESS": # The flow completed successfully. finished = True logger.info("The flow %s successfully completed.", flow_id) except botocore.exceptions.ClientError as e: print(f"Client error: {str(e)}") logger.error("Client error: %s", {str(e)}) except Exception as e: print(f"An error occurred: {str(e)}") logger.error("An error occurred: %s", {str(e)}) logger.error("Error type: %s", {type(e)}) def main(): """ Main entry point for the script. """ # Replace these with your actual flow ID and flow alias ID. FLOW_ID = 'YOUR_FLOW_ID' FLOW_ALIAS_ID = 'YOUR_FLOW_ALIAS_ID' logger.info("Starting conversation with FLOW: %s ID: %s", FLOW_ID, FLOW_ALIAS_ID) # Get the Bedrock agent runtime client. session = boto3.Session(profile_name='default') bedrock_agent_client = session.client('bedrock-agent-runtime') # Start the conversation. converse_with_flow(bedrock_agent_client, FLOW_ID, FLOW_ALIAS_ID) logger.info("Conversation with FLOW: %s ID: %s finished", FLOW_ID, FLOW_ALIAS_ID) if __name__ == "__main__": main()
  • Untuk detail API, lihat InvokeFlowdi AWS SDK for Python (Boto3) Referensi API.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara pengaturan dan menjalankannya di Repositori Contoh Kode AWS.

""" Shows how to run an Amazon Bedrock flow with InvokeFlow and handle muli-turn interaction for a single conversation. For more information, see https://docs.aws.amazon.com/bedrock/latest/userguide/flows-multi-turn-invocation.html. """ import logging import boto3 import botocore import botocore.exceptions logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) def invoke_flow(client, flow_id, flow_alias_id, input_data, execution_id): """ Invoke an Amazon Bedrock flow and handle the response stream. Args: client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to invoke. flow_alias_id: The alias ID of the flow. input_data: Input data for the flow. execution_id: Execution ID for continuing a flow. Use the value None on first run. Returns: Dict containing flow_complete status, input_required info, and execution_id """ response = None request_params = None if execution_id is None: # Don't pass execution ID for first run. request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "inputs": [input_data], "enableTrace": True } else: request_params = { "flowIdentifier": flow_id, "flowAliasIdentifier": flow_alias_id, "executionId": execution_id, "inputs": [input_data], "enableTrace": True } response = client.invoke_flow(**request_params) if "executionId" not in request_params: execution_id = response['executionId'] input_required = None flow_status = "" # Process the streaming response for event in response['responseStream']: # Check if flow is complete. if 'flowCompletionEvent' in event: flow_status = event['flowCompletionEvent']['completionReason'] # Check if more input us needed from user. elif 'flowMultiTurnInputRequestEvent' in event: input_required = event # Print the model output. elif 'flowOutputEvent' in event: print(event['flowOutputEvent']['content']['document']) # Log trace events. elif 'flowTraceEvent' in event: logger.info("Flow trace: %s", event['flowTraceEvent']) return { "flow_status": flow_status, "input_required": input_required, "execution_id": execution_id } def converse_with_flow(bedrock_agent_client, flow_id, flow_alias_id): """ Run a conversation with the supplied flow. Args: bedrock_agent_client: Boto3 client for Amazon Bedrock agent runtime. flow_id: The ID of the flow to run. flow_alias_id: The alias ID of the flow. """ flow_execution_id = None finished = False # Get the intial prompt from the user. user_input = input("Enter input: ") # Use prompt to create input data. flow_input_data = { "content": { "document": user_input }, "nodeName": "FlowInputNode", "nodeOutputName": "document" } try: while not finished: # Invoke the flow until successfully finished. result = invoke_flow( bedrock_agent_client, flow_id, flow_alias_id, flow_input_data, flow_execution_id) status = result['flow_status'] flow_execution_id = result['execution_id'] more_input = result['input_required'] if status == "INPUT_REQUIRED": # The flow needs more information from the user. logger.info("The flow %s requires more input", flow_id) user_input = input( more_input['flowMultiTurnInputRequestEvent']['content']['document'] + ": ") flow_input_data = { "content": { "document": user_input }, "nodeName": more_input['flowMultiTurnInputRequestEvent']['nodeName'], "nodeInputName": "agentInputText" } elif status == "SUCCESS": # The flow completed successfully. finished = True logger.info("The flow %s successfully completed.", flow_id) except botocore.exceptions.ClientError as e: print(f"Client error: {str(e)}") logger.error("Client error: %s", {str(e)}) except Exception as e: print(f"An error occurred: {str(e)}") logger.error("An error occurred: %s", {str(e)}) logger.error("Error type: %s", {type(e)}) def main(): """ Main entry point for the script. """ # Replace these with your actual flow ID and flow alias ID. FLOW_ID = 'YOUR_FLOW_ID' FLOW_ALIAS_ID = 'YOUR_FLOW_ALIAS_ID' logger.info("Starting conversation with FLOW: %s ID: %s", FLOW_ID, FLOW_ALIAS_ID) # Get the Bedrock agent runtime client. session = boto3.Session(profile_name='default') bedrock_agent_client = session.client('bedrock-agent-runtime') # Start the conversation. converse_with_flow(bedrock_agent_client, FLOW_ID, FLOW_ALIAS_ID) logger.info("Conversation with FLOW: %s ID: %s finished", FLOW_ID, FLOW_ALIAS_ID) if __name__ == "__main__": main()
  • Untuk detail API, lihat InvokeFlowdi AWS SDK for Python (Boto3) Referensi API.

PrivasiSyarat situsPreferensi cookie
© 2025, Amazon Web Services, Inc. atau afiliasinya. Semua hak dilindungi undang-undang.