Amazon 베드록용 에이전트의 Lambda 파서 함수 - Amazon Bedrock

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon 베드록용 에이전트의 Lambda 파서 함수

각 프롬프트 템플릿에는 수정할 수 있는 파서 Lambda 함수가 포함되어 있습니다. 사용자 지정 파서 Lambda 함수를 작성하려면 에이전트가 전송하는 입력 이벤트와 에이전트가 Lambda 함수의 출력으로 예상하는 응답을 이해해야 합니다. 입력 이벤트의 변수를 조작하고 응답을 반환하는 핸들러 함수를 작성합니다. AWS Lambda 작동 방식에 대한 자세한 내용은 개발자 안내서의 이벤트 기반 호출을 참조하십시오. AWS Lambda

파서 Lambda 입력 이벤트

다음은 에이전트의 입력 이벤트의 일반적인 구조입니다. 필드를 사용하여 Lambda 핸들러 함수를 작성합니다.

{ "messageVersion": "1.0", "agent": { "name": "string", "id": "string", "alias": "string", "version": "string" }, "invokeModelRawResponse": "string", "promptType": "ORCHESTRATION | POST_PROCESSING | PRE_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION ", "overrideType": "OUTPUT_PARSER" }

다음 목록은 입력 이벤트 필드를 설명합니다.

  • messageVersion - Lambda 함수로 이동하는 이벤트 데이터의 형식과 Lambda 함수에서 나올 것으로 예상되는 응답 형식을 식별하는 메시지 버전입니다. Amazon Bedrock용 에이전트는 버전 1.0만 지원합니다.

  • agent - 프롬프트가 포함된 에이전트의 이름, ID, 별칭 및 버전에 대한 정보가 들어 있습니다.

  • invokeModelRawResponse - 출력을 파싱할 프롬프트의 원시 파운데이션 모델 출력입니다.

  • promptType - 출력을 파싱할 프롬프트 유형입니다.

  • overrideType - 이 Lambda 함수가 재정의하는 아티팩트입니다. 현재는 디폴트 파서를 재정의해야 한다는 의미만 OUTPUT_PARSER 지원됩니다.

파서 Lambda 응답

에이전트는 Lambda 함수에서 다음 형식과 일치하는 응답을 기대합니다. 에이전트는 응답을 사용하여 추가 오케스트레이션을 수행하거나 사용자에게 응답을 반환하는 데 도움을 줍니다. Lambda 함수 응답 필드를 사용하여 출력이 반환되는 방식을 구성합니다.

작업 그룹을 OpenAPI 스키마로 정의했는지 아니면 함수 세부 정보를 사용하여 정의했는지에 해당하는 탭을 선택합니다.

OpenAPI schema
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "verb": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
Function details
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }

다음 목록은 Lambda 응답 필드를 설명합니다.

  • messageVersion - Lambda 함수로 이동하는 이벤트 데이터의 형식과 Lambda 함수에서 나올 것으로 예상되는 응답 형식을 식별하는 메시지 버전입니다. Amazon Bedrock용 에이전트는 버전 1.0만 지원합니다.

  • promptType - 현재 턴의 프롬프트 유형입니다.

  • preProcessingParsedResponse - PRE_PROCESSING 프롬프트 유형에서 파싱된 응답입니다.

  • orchestrationParsedResponse - ORCHESTRATION 프롬프트 유형에서 파싱된 응답입니다. 자세한 내용은 다음을 참조하세요.

  • knowledgeBaseResponseGenerationParsedResponse - KNOWLEDGE_BASE_RESPONSE_GENERATION 프롬프트 유형에서 파싱된 응답입니다.

  • postProcessingParsedResponse - POST_PROCESSING 프롬프트 유형에서 파싱된 응답입니다.

네 개의 프롬프트 템플릿의 구문 분석된 응답에 대한 자세한 내용은 다음 탭을 참조하십시오.

preProcessingParsedResponse
{ "isValidInput": "boolean", "rationale": "string" }

preProcessingParsedResponse에는 다음 필드가 포함됩니다.

  • isValidInput - 사용자 입력이 유효한지 여부를 지정합니다. 함수를 정의하여 사용자 입력의 유효성을 특성화하는 방법을 확인할 수 있습니다.

  • rationale - 사용자 입력 분류의 근거입니다. 이 근거는 모델이 원시 응답으로 제공하고, Lambda 함수가 이를 파싱하고, 에이전트가 사전 처리를 위해 트레이스에 이를 제시합니다.

orchestrationResponse

의 형식은 스키마를 사용하여 작업 그룹을 정의했는지 아니면 함수 세부 정보로 정의했는지에 orchestrationResponse 따라 달라집니다. OpenAPI

  • OpenAPI스키마를 사용하여 작업 그룹을 정의한 경우 응답은 다음 형식이어야 합니다.

    { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "verb": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } }
  • 함수 세부 정보가 포함된 작업 그룹을 정의한 경우 응답은 다음 형식이어야 합니다.

    { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "actionGroupInput": { "<parameter>": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } }

orchestrationParsedResponse에는 다음 필드가 포함됩니다.

  • rationale - 파운데이션 모델 출력을 기반으로 다음에 무엇을 해야 하는지에 대한 근거입니다. 모델 출력에서 파싱할 함수를 정의할 수 있습니다.

  • parsingErrorDetails - repromptResponse가 포함되며 이는 모델 응답을 파싱할 수 없을 때 원시 응답을 업데이트하도록 모델을 다시 요청하라는 메시지입니다. 함수를 정의하여 모델을 다시 프롬프트하는 방법을 조작할 수 있습니다.

  • responseDetails - 파운데이션 모델의 출력을 처리하는 방법에 대한 세부 정보가 들어 있습니다. invocationType이 포함되며 이는 에이전트가 취해야 할 다음 단계이고, invocationType과 일치해야 하는 두 번째 필드입니다. 다음 객체에서 발생 가능한 항목을 확인합니다.

    • agentAskUser - ASK_USER 간접 호출 유형과 호환됩니다. 이 간접 호출 유형은 오케스트레이션 단계를 종료합니다. 사용자에게 자세한 정보를 요청하는 responseText가 들어 있습니다. 이 필드를 조작하도록 함수를 정의할 수 있습니다.

    • actionGroupInvocation - ACTION_GROUP 간접 호출 유형과 호환됩니다. Lambda 함수를 정의하여 호출할 작업 그룹과 전달할 파라미터를 결정할 수 있습니다. 다음 필드를 포함합니다.

      • actionGroupName - 간접적으로 호출할 작업 그룹입니다.

      • OpenAPI스키마를 사용하여 작업 그룹을 정의한 경우 다음 필드는 필수 필드입니다.

        • apiName— 작업 그룹에서 호출할 API 작업의 이름.

        • verb— 사용할 API 작업의 메서드입니다.

      • 함수 세부 정보가 포함된 작업 그룹을 정의한 경우 다음 필드가 필요합니다.

        • functionName— 작업 그룹에서 호출할 함수의 이름.

      • actionGroupInput— API 작업 요청에서 지정할 매개 변수를 포함합니다.

    • agentKnowledgeBase - KNOWLEDGE_BASE 간접 호출 유형과 호환됩니다. 함수를 정의하여 지식 기반을 쿼리하는 방법을 결정할 수 있습니다. 다음 필드를 포함합니다.

      • knowledgeBaseId - 지식 기분의 고유한 식별자입니다.

      • searchQueryvalue 필드의 지식 베이스로 보낼 쿼리가 들어 있습니다.

    • agentFinalResponse - FINISH 간접 호출 유형과 호환됩니다. 이 간접 호출 유형은 오케스트레이션 단계를 종료합니다. responseText 필드에 있는 사용자에 대한 응답과 citations 객체의 응답에 대한 인용을 포함합니다.

knowledgeBaseResponseGenerationParsedResponse
{ "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }

knowledgeBaseResponseGenerationParsedResponse에는 지식 베이스를 쿼리하는 양식과 데이터 generatedResponse 원본에 대한 참조가 들어 있습니다.

postProcessingParsedResponse
{ "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }

postProcessingParsedResponse에는 다음 필드가 포함됩니다.

  • responseText - 최종 사용자에게 반환하기 위한 응답입니다. 함수를 정의하여 응답 형식을 지정할 수 있습니다.

  • citations - 응답에 대한 인용 목록이 포함됩니다. 각 인용에는 인용된 텍스트와 해당 문헌이 표시됩니다.

파서 Lambda 예제

예제 파서 Lambda 함수 입력 이벤트 및 응답을 보려면 다음 탭 중에서 선택하십시오.

Pre-processing

예제 입력 이벤트

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "PRE_PROCESSING" }

응답의 예

{ "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n", "isValidInput": false } }
Orchestration

예제 입력 이벤트

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID23456::Search function.\\n\\n</scratchpad>\\n\\n<function_call>GET::x_amz_knowledgebase_KBID123456::Search(searchQuery=\"What is the phone number I can call?\)", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "ORCHESTRATION" }

응답의 예

{ "promptType": "ORCHESTRATION", "orchestrationParsedResponse": { "rationale": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call Farmers.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID123456::Search function.", "responseDetails": { "invocationType": "KNOWLEDGE_BASE", "agentKnowledgeBase": { "searchQuery": { "value": "What is the phone number I can call?" }, "knowledgeBaseId": "KBID123456" } } } }
Knowledge base response generation

예제 입력 이벤트

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "{\"completion\":\" <answer>\\\\n<answer_part>\\\\n<text>\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n</text>\\\\n<sources>\\\\n<source>1234567-1234-1234-1234-123456789abc</source>\\\\n<source>2345678-2345-2345-2345-23456789abcd</source>\\\\n<source>3456789-3456-3456-3456-3456789abcde</source>\\\\n</sources>\\\\n</answer_part>\\\\n</answer>\",\"stop_reason\":\"stop_sequence\",\"stop\":\"\\\\n\\\\nHuman:\"}", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION" }

응답의 예

{ "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION", "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n", "references": [ {"sourceId": "1234567-1234-1234-1234-123456789abc"}, {"sourceId": "2345678-2345-2345-2345-23456789abcd"}, {"sourceId": "3456789-3456-3456-3456-3456789abcde"} ] } ] } } }
Post-processing

예제 입력 이벤트

{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "<final_response>\\nBased on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 1234567-1234-1234-1234-123456789abc, 2345678-2345-2345-2345-23456789abcd). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions.\\n</final_response>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "POST_PROCESSING" }

응답의 예

{ "promptType": "POST_PROCESSING", "postProcessingParsedResponse": { "responseText": "Based on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 24c62d8c-3e39-4ca1-9470-a91d641fe050, 197815ef-8798-4cb1-8aa5-35f5d6b28365). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions." } }

예제 파서 Lambda 함수를 보려면 보려는 프롬프트 템플릿 예제의 섹션을 확장하십시오. lambda_handler 함수는 파싱된 응답을 에이전트에 반환합니다.

다음 예제는 사전 처리 파서 Lambda 함수를 작성한 것을 보여줍니다. Python

import json import re import logging PRE_PROCESSING_RATIONALE_REGEX = "&lt;thinking&gt;(.*?)&lt;/thinking&gt;" PREPROCESSING_CATEGORY_REGEX = "&lt;category&gt;(.*?)&lt;/category&gt;" PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING" PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL) PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt def lambda_handler(event, context): print("Lambda input: " + str(event)) logger.info("Lambda input: " + str(event)) prompt_type = event["promptType"] # Sanitize LLM response model_response = sanitize_response(event['invokeModelRawResponse']) if event["promptType"] == PREPROCESSING_PROMPT_TYPE: return parse_pre_processing(model_response) def parse_pre_processing(model_response): category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response) rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response) category = next((match.group(1) for match in category_matches), None) rationale = next((match.group(1) for match in rationale_matches), None) return { "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": rationale, "isValidInput": get_is_valid_input(category) } } def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def get_is_valid_input(category): if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E": return True return False

다음 예제는 로 작성된 오케스트레이션 파서 Lambda 함수를 보여줍니다. Python

예제 코드는 작업 그룹이 OpenAPI 스키마로 정의되었는지 아니면 함수 세부 정보로 정의되었는지에 따라 달라집니다.

  1. OpenAPI스키마로 정의된 작업 그룹의 예를 보려면 예제를 보려는 모델에 해당하는 탭을 선택하십시오.

    Anthropic Claude 2.0
    import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 2.1
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 3
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
  2. 함수 세부 정보로 정의된 작업 그룹의 예를 보려면 예제를 보려는 모델에 해당하는 탭을 선택하십시오.

    Anthropic Claude 2.0
    import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 2.1
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
    Anthropic Claude 3
    import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }

다음 예제는 지식 기반 응답 생성 파서 Lambda 함수가 작성된 것을 보여줍니다. Python

import json import re import logging ANSWER_PART_REGEX = "&lt;answer_part\\s?>(.+?)&lt;/answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "&lt;text\\s?>(.+?)&lt;/text\\s?>" ANSWER_REFERENCE_PART_REGEX = "&lt;source\\s?>(.+?)&lt;/source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) raw_response = event['invokeModelRawResponse'] parsed_response = { 'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION', 'knowledgeBaseResponseGenerationParsedResponse': { 'generatedResponse': parse_generated_response(raw_response) } } logger.info(parsed_response) return parsed_response def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return { 'generatedResponseParts': generated_response_parts } def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references

다음 예제는 에 작성된 포스트 프로세싱 파서 Lambda 함수를 보여줍니다. Python

import json import re import logging FINAL_RESPONSE_REGEX = r"&lt;final_response>([\s\S]*?)&lt;/final_response>" FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL) logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) raw_response = event['invokeModelRawResponse'] parsed_response = { 'promptType': 'POST_PROCESSING', 'postProcessingParsedResponse': {} } matcher = FINAL_RESPONSE_PATTERN.search(raw_response) if not matcher: raise Exception("Could not parse raw LLM output") response_text = matcher.group(1).strip() parsed_response['postProcessingParsedResponse']['responseText'] = response_text logger.info(parsed_response) return parsed_response