Parser-Lambda-Funktion in Agents for Amazon Bedrock - Amazon Bedrock

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Parser-Lambda-Funktion in Agents for Amazon Bedrock

Jede Eingabeaufforderungsvorlage enthält eine Parser-Lambda-Funktion, die Sie ändern können. Um eine benutzerdefinierte Parser-Lambda-Funktion zu schreiben, müssen Sie das Eingabeereignis, das Ihr Agent sendet, und die Antwort, die der Agent als Ausgabe der Lambda-Funktion erwartet, verstehen. Sie schreiben eine Handler-Funktion, um Variablen aus dem Eingabeereignis zu bearbeiten und die Antwort zurückzugeben. Weitere Informationen zur AWS Lambda Funktionsweise finden Sie unter Event-driven Invocation im Developer Guide. AWS Lambda

Parser-Lambda-Eingabeereignis

Im Folgenden finden Sie die allgemeine Struktur des Eingabeereignisses des Agenten. Schreiben Sie Ihre Lambda-Handler-Funktion in die Felder.

{ "messageVersion": "1.0", "agent": { "name": "string", "id": "string", "alias": "string", "version": "string" }, "invokeModelRawResponse": "string", "promptType": "ORCHESTRATION | POST_PROCESSING | PRE_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION ", "overrideType": "OUTPUT_PARSER" }

In der folgenden Liste werden die Eingabeereignisfelder beschrieben:

  • messageVersion: Die Version der Mitteilung, die das Format der Ereignisdaten, die in die Lambda-Funktion eingehen, und das erwartete Format der Antwort von der Lambda-Funktion identifiziert. Agents für Amazon Bedrock unterstützt nur die Version 1.0.

  • agent: Enthält Informationen über den Namen, die ID, den Alias und die Version des Agenten, dem die Eingabeaufforderung angehört.

  • invokeModelRawResponse: Die Rohausgabe des Basismodells der Eingabeaufforderung, deren Ausgabe analysiert werden soll.

  • promptType: Der Eingabeaufforderungstyp, deren Ausgabe analysiert werden soll.

  • overrideType: Die Artefakte, die diese Lambda-Funktion überschreibt. Derzeit OUTPUT_PARSER wird nur unterstützt, was darauf hinweist, dass der Standardparser überschrieben werden soll.

Antwort der Lambda-Funktion

Ihr Agent erwartet eine Antwort Ihrer Lambda-Funktion im folgenden Format. Der Agent verwendet die Antwort für die weitere Orchestrierung oder um ihm zu helfen, eine Antwort an den Benutzer zurückzugeben. Verwenden Sie die Antwortfelder der Lambda-Funktion, um zu konfigurieren, wie die Ausgabe zurückgegeben wird.

Wählen Sie die Registerkarte aus, die der Angabe entspricht, ob Sie die Aktionsgruppe mit einem OpenAPI Schema oder mit Funktionsdetails definiert haben:

OpenAPI schema
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "verb": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
Function details
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }

In der folgenden Liste werden die Lambda-Antwortfelder beschrieben:

  • messageVersion: Die Version der Mitteilung, die das Format der Ereignisdaten, die in die Lambda-Funktion eingehen, und das erwartete Format der Antwort von einer Lambda-Funktion identifiziert. Agents für Amazon Bedrock unterstützt nur die Version 1.0.

  • promptType: Der Eingabeaufforderungstyp des aktuellen Abschnitts.

  • preProcessingParsedResponse: Die analysierte Antwort für den Eingabeaufforderungstyp PRE_PROCESSING.

  • orchestrationParsedResponse: Die analysierte Antwort für den Eingabeaufforderungstyp ORCHESTRATION. Weitere Details finden Sie unten.

  • knowledgeBaseResponseGenerationParsedResponse: Die analysierte Antwort für den Eingabeaufforderungstyp KNOWLEDGE_BASE_RESPONSE_GENERATION.

  • postProcessingParsedResponse: Die analysierte Antwort für den Eingabeaufforderungstyp POST_PROCESSING.

Weitere Informationen zu den analysierten Antworten für die vier Eingabeaufforderungsvorlagen finden Sie auf den folgenden Registerkarten.

preProcessingParsedResponse
{ "isValidInput": "boolean", "rationale": "string" }

Die preProcessingParsedResponse enthält die folgenden Felder:

  • isValidInput: Gibt an, ob die Benutzereingabe gültig ist oder nicht. Sie können die Funktion definieren, um zu bestimmen, wie die Gültigkeit von Benutzereingaben charakterisiert werden soll.

  • rationale: Die Begründung für die Kategorisierung von Benutzereingaben. Diese Begründung liefert das Modell in der Rohantwort, die Lambda-Funktion analysiert es und der Agent präsentiert es zur Vorverarbeitung im Trace.

orchestrationResponse

Das Format von orchestrationResponse hängt davon ab, ob Sie die Aktionsgruppe mit einem OpenAPI Schema oder mit Funktionsdetails definiert haben:

  • Wenn Sie die Aktionsgruppe mit einem OpenAPI Schema definiert haben, muss die Antwort das folgende Format haben:

    { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "verb": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } }
  • Wenn Sie die Aktionsgruppe mit Funktionsdetails definiert haben, muss die Antwort das folgende Format haben:

    { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } }

Das orchestrationParsedResponse enthält die folgenden Felder:

  • rationale: Die Begründung für die nächsten Schritte basierend auf den Ergebnissen des Basismodells. Sie können die Funktion definieren, die anhand der Modellausgabe analysiert werden soll.

  • parsingErrorDetails: Enthält die repromptResponse, mit der das Modell erneut aufgefordert wird, seine Rohantwort zu aktualisieren, wenn die Modellantwort nicht analysiert werden kann. Sie können die Funktion definieren, um zu steuern, wie das Modell erneut aufgefordert wird.

  • responseDetails: Enthält die Einzelheiten zur Handhabung der Ausgabe des Basismodells. Enthält einen invocationType (der nächste Schritt, den der Agent ausführen muss) und ein zweites Feld, das dem invocationType entsprechen sollte. Die folgenden Objekte sind möglich.

    • agentAskUser: Kompatibel mit dem ASK_USER-Aufruftyp. Dieser Aufruftyp beendet den Orchestrierungsschritt. Enthält den responseText, mit dem die Benutzer um weitere Informationen gebeten werden. Sie können Ihre Funktion definieren, um dieses Feld zu bearbeiten.

    • actionGroupInvocation: Kompatibel mit dem ACTION_GROUP-Aufruftyp. Sie können Ihre Lambda-Funktion so definieren, dass Aktionsgruppen aufgerufen und Parameter übergeben werden. Enthält die folgenden Felder:

      • actionGroupName: Die Aktionsgruppe, die aufgerufen werden soll.

      • Die folgenden Felder sind erforderlich, wenn Sie die Aktionsgruppe mit einem OpenAPI Schema definiert haben:

        • apiName— Der Name der API-Operation, die in der Aktionsgruppe aufgerufen werden soll.

        • verb— Die Methode der zu verwendenden API-Operation.

      • Das folgende Feld ist erforderlich, wenn Sie die Aktionsgruppe mit Funktionsdetails definiert haben:

        • functionName— Der Name der Funktion, die in der Aktionsgruppe aufgerufen werden soll.

      • actionGroupInput— Enthält Parameter, die in der API-Operationsanforderung angegeben werden müssen.

    • agentKnowledgeBase: Kompatibel mit dem KNOWLEDGE_BASE-Aufruftyp. Sie können Ihre Funktion definieren, um zu bestimmen, wie Wissensdatenbanken abgefragt werden sollen. Enthält die folgenden Felder:

      • knowledgeBaseId: Die eindeutige Kennung der Wissensdatenbank.

      • searchQuery— Enthält die Abfrage, die an die Wissensdatenbank in dem value Feld gesendet werden soll.

    • agentFinalResponse: Kompatibel mit dem FINISH-Aufruftyp. Dieser Aufruftyp beendet den Orchestrierungsschritt. Enthält die Antwort an den Benutzer im Feld responseText und Zitate für die Antwort im Objekt citations.

knowledgeBaseResponseGenerationParsedResponse
{ "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }

Das knowledgeBaseResponseGenerationParsedResponse enthält das generatedResponse Formular zum Abfragen der Wissensdatenbank und Verweise auf die Datenquellen.

postProcessingParsedResponse
{ "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }

Das postProcessingParsedResponse enthält die folgenden Felder:

  • responseText: Die Antwort, die an den Endbenutzer zurückgegeben werden soll. Sie können die Funktion zum Formatieren der Antwort definieren.

  • citations: Enthält eine Liste von Zitaten für die Antwort. Jedes Zitat zeigt den zitierten Text und seine Quellenangaben.

Beispiele für Parser-Lambda

Um Beispiele für Eingabeereignisse und Antworten der Lambda-Parser-Funktion zu sehen, wählen Sie eine der folgenden Registerkarten aus.

Pre-processing

Beispiel für ein Eingabeereignis

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "PRE_PROCESSING" }

Beispielantwort

{ "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n", "isValidInput": false } }
Orchestration

Beispiel für ein Eingabeereignis

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID23456::Search function.\\n\\n</scratchpad>\\n\\n<function_call>GET::x_amz_knowledgebase_KBID123456::Search(searchQuery=\"What is the phone number I can call?\)", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "ORCHESTRATION" }

Beispielantwort

{ "promptType": "ORCHESTRATION", "orchestrationParsedResponse": { "rationale": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call Farmers.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID123456::Search function.", "responseDetails": { "invocationType": "KNOWLEDGE_BASE", "agentKnowledgeBase": { "searchQuery": { "value": "What is the phone number I can call?" }, "knowledgeBaseId": "KBID123456" } } } }
Knowledge base response generation

Beispiel für ein Eingabeereignis

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "{\"completion\":\" <answer>\\\\n<answer_part>\\\\n<text>\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n</text>\\\\n<sources>\\\\n<source>1234567-1234-1234-1234-123456789abc</source>\\\\n<source>2345678-2345-2345-2345-23456789abcd</source>\\\\n<source>3456789-3456-3456-3456-3456789abcde</source>\\\\n</sources>\\\\n</answer_part>\\\\n</answer>\",\"stop_reason\":\"stop_sequence\",\"stop\":\"\\\\n\\\\nHuman:\"}", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION" }

Beispielantwort

{ "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION", "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n", "references": [ {"sourceId": "1234567-1234-1234-1234-123456789abc"}, {"sourceId": "2345678-2345-2345-2345-23456789abcd"}, {"sourceId": "3456789-3456-3456-3456-3456789abcde"} ] } ] } } }
Post-processing

Beispiel für ein Eingabeereignis

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "<final_response>\\nBased on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 1234567-1234-1234-1234-123456789abc, 2345678-2345-2345-2345-23456789abcd). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions.\\n</final_response>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "POST_PROCESSING" }

Beispielantwort

{ "promptType": "POST_PROCESSING", "postProcessingParsedResponse": { "responseText": "Based on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 24c62d8c-3e39-4ca1-9470-a91d641fe050, 197815ef-8798-4cb1-8aa5-35f5d6b28365). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions." } }

Um Beispiele für Lambda-Parser-Funktionen zu sehen, erweitern Sie den Abschnitt mit den Beispielen für Prompt-Vorlagen, die Sie sehen möchten. Die lambda_handler-Funktion gibt die analysierte Antwort an den Agenten zurück.

Das folgende Beispiel zeigt eine Lambda-Funktion für die Vorverarbeitung eines Parsers, in die geschrieben wurde. Python

import json import re import logging PRE_PROCESSING_RATIONALE_REGEX = "&lt;thinking&gt;(.*?)&lt;/thinking&gt;" PREPROCESSING_CATEGORY_REGEX = "&lt;category&gt;(.*?)&lt;/category&gt;" PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING" PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL) PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt def lambda_handler(event, context): print("Lambda input: " + str(event)) logger.info("Lambda input: " + str(event)) prompt_type = event["promptType"] # Sanitize LLM response model_response = sanitize_response(event['invokeModelRawResponse']) if event["promptType"] == PREPROCESSING_PROMPT_TYPE: return parse_pre_processing(model_response) def parse_pre_processing(model_response): category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response) rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response) category = next((match.group(1) for match in category_matches), None) rationale = next((match.group(1) for match in rationale_matches), None) return { "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": rationale, "isValidInput": get_is_valid_input(category) } } def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def get_is_valid_input(category): if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E": return True return False

Die folgenden Beispiele zeigen eine Lambda-Funktion für den Orchestrierungsparser, in die geschrieben wurde. Python

Der Beispielcode unterscheidet sich je nachdem, ob Ihre Aktionsgruppe mit einem OpenAPI Schema oder mit Funktionsdetails definiert wurde:

  1. Um Beispiele für eine mit einem OpenAPI Schema definierte Aktionsgruppe zu sehen, wählen Sie die Registerkarte aus, die dem Modell entspricht, für das Sie Beispiele sehen möchten.

    Anthropic Claude 2.0
    import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 2.1
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 3
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
  2. Um Beispiele für eine mit Funktionsdetails definierte Aktionsgruppe anzuzeigen, wählen Sie die Registerkarte aus, die dem Modell entspricht, für das Sie Beispiele anzeigen möchten.

    Anthropic Claude 2.0
    import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 2.1
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 3
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }

Das folgende Beispiel zeigt eine Lambda-Funktion für die Generierung von Antworten in der Wissensdatenbank, in die geschrieben wurde. Python

import json import re import logging ANSWER_PART_REGEX = "&lt;answer_part\\s?>(.+?)&lt;/answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "&lt;text\\s?>(.+?)&lt;/text\\s?>" ANSWER_REFERENCE_PART_REGEX = "&lt;source\\s?>(.+?)&lt;/source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) raw_response = event['invokeModelRawResponse'] parsed_response = { 'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION', 'knowledgeBaseResponseGenerationParsedResponse': { 'generatedResponse': parse_generated_response(raw_response) } } logger.info(parsed_response) return parsed_response def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return { 'generatedResponseParts': generated_response_parts } def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references

Das folgende Beispiel zeigt eine Lambda-Funktion für die Nachbearbeitung eines Parsers, in die geschrieben wurde. Python

import json import re import logging FINAL_RESPONSE_REGEX = r"&lt;final_response>([\s\S]*?)&lt;/final_response>" FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) raw_response = event['invokeModelRawResponse'] parsed_response = { 'promptType': 'POST_PROCESSING', 'postProcessingParsedResponse': {} } matcher = FINAL_RESPONSE_PATTERN.search(raw_response) if not matcher: raise Exception("Could not parse raw LLM output") response_text = matcher.group(1).strip() parsed_response['postProcessingParsedResponse']['responseText'] = response_text logger.info(parsed_response) return parsed_response