Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Setiap template prompt menyertakan fungsi Lambda parser yang dapat Anda modifikasi. Untuk menulis fungsi Lambda parser kustom, Anda harus memahami peristiwa masukan yang dikirim agen Anda dan respons yang diharapkan agen sebagai output dari fungsi Lambda. Anda menulis fungsi handler untuk memanipulasi variabel dari peristiwa masukan dan mengembalikan respons. Untuk informasi selengkapnya tentang cara AWS Lambda kerja, lihat Pemanggilan berbasis peristiwa di Panduan Pengembang. AWS Lambda
Acara masukan Parser Lambda
Berikut ini adalah struktur umum dari peristiwa input dari agen. Gunakan bidang untuk menulis fungsi penangan Lambda Anda.
{
"messageVersion": "1.0",
"agent": {
"name": "string",
"id": "string",
"alias": "string",
"version": "string"
},
"invokeModelRawResponse": "string",
"promptType": "ORCHESTRATION | ROUTING_CLASSIFIER | POST_PROCESSING | PRE_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION | MEMORY_SUMMARIZATION",
"overrideType": "OUTPUT_PARSER"
}
Daftar berikut menjelaskan bidang peristiwa masukan:
-
messageVersion
— Versi pesan yang mengidentifikasi format data peristiwa yang masuk ke fungsi Lambda dan format respons yang diharapkan dari fungsi Lambda. Amazon Bedrock Agents hanya mendukung versi 1.0. -
agent
— Berisi informasi tentang nama, ID, alias, dan versi agen yang menjadi milik petunjuknya. -
invokeModelRawResponse
— Output model pondasi mentah dari prompt yang outputnya akan diurai. -
promptType
— Jenis prompt yang outputnya akan diurai. -
overrideType
— Artefak yang dikesampingkan oleh fungsi Lambda ini. Saat ini, hanyaOUTPUT_PARSER
didukung, yang menunjukkan bahwa parser default akan diganti.
Respons Parser Lambda
Agen Anda mengharapkan respons dari fungsi Lambda Anda dan menggunakan respons untuk mengambil tindakan lebih lanjut atau membantunya mengembalikan respons kepada pengguna. Agen Anda mengeksekusi tindakan berikutnya yang direkomendasikan oleh model agen. Tindakan selanjutnya dapat dijalankan dalam urutan serial atau paralel tergantung pada model agen dan kapan agen dibuat dan disiapkan.
Jika Anda telah membuat dan menyiapkan agen Anda sebelum 4 Oktober 2024 dan jika agen Anda menggunakan Anthropic Claude 3 Sonnet atau Anthropic Claude 3.5 Sonnet model, secara default, tindakan teratas berikutnya yang direkomendasikan oleh model agen akan dijalankan dalam urutan serial.
Jika Anda telah membuat agen baru atau menyiapkan agen yang ada setelah 10 Oktober 2024 dan agen Anda menggunakan Anthropic Claude 3 Sonnet, Anthropic Claude 3.5 Sonnet, atau apapun non-Anthropic model, tindakan langkah selanjutnya yang direkomendasikan oleh model agen akan berjalan secara paralel. Ini berarti bahwa beberapa tindakan, misalnya, campuran fungsi kelompok tindakan dan basis pengetahuan, akan dieksekusi secara paralel. Ini mengurangi jumlah panggilan yang dilakukan ke model yang mengurangi latensi keseluruhan.
Anda dapat mengaktifkan tindakan paralel untuk agen Anda yang dibuat dan disiapkan sebelum 4 Oktober 2024 dengan menelepon PrepareAgentAPIatau dengan memilih Siapkan di pembuat agen agen Anda di konsol. Setelah agen disiapkan, Anda akan melihat template prompt yang diperbarui dan skema Lambda parser versi baru.
Contoh parser Lambda respon
Berikut ini adalah contoh struktur umum respons dari agen yang menjalankan tindakan yang direkomendasikan atas berikutnya dalam urutan serial dan agen yang menjalankan tindakan selanjutnya secara paralel. Gunakan bidang respons fungsi Lambda untuk mengonfigurasi bagaimana output dikembalikan.
Contoh respons dari agen yang menjalankan tindakan yang direkomendasikan teratas berikutnya dalam urutan serial
Pilih tab yang sesuai dengan apakah Anda mendefinisikan grup tindakan dengan OpenAPI skema atau dengan rincian fungsi:
catatan
MessageVersion 1.0
Ini menunjukkan bahwa agen menjalankan tindakan yang direkomendasikan teratas berikutnya dalam urutan serial.
- OpenAPI schema
-
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | ROUTING_CLASSIFIER | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "AGENT_COLLABORATOR | ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "agentCollaboratorInvocation": { "agentCollaboratorName": "string", "input": { "text": "string" } } ... } }, "routingClassifierParsedResponse": { "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "type": "AGENT | LAST_AGENT | UNDECIDED", "agentCollaboratorInvocation": { "agentCollaboratorName": "string", "input": { "text": "string" } } } } } "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "id": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } } - Function details
-
{ "messageVersion": "1.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "id": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
{
"messageVersion": "1.0",
"promptType": "ORCHESTRATION | PRE_PROCESSING | ROUTING_CLASSIFIER | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
"preProcessingParsedResponse": {
"isValidInput": "boolean",
"rationale": "string"
},
"orchestrationParsedResponse": {
"rationale": "string",
"parsingErrorDetails": {
"repromptResponse": "string"
},
"responseDetails": {
"invocationType": "AGENT_COLLABORATOR | ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
"agentAskUser": {
"responseText": "string",
"id": "string"
},
"agentCollaboratorInvocation": {
"agentCollaboratorName": "string",
"input": {
"text": "string"
}
}
...
}
},
"routingClassifierParsedResponse": {
"parsingErrorDetails": {
"repromptResponse": "string"
},
"responseDetails": {
"type": "AGENT | LAST_AGENT | UNDECIDED",
"agentCollaboratorInvocation": {
"agentCollaboratorName": "string",
"input": {
"text": "string"
}
}
}
}
}
"actionGroupInvocation": {
"actionGroupName": "string",
"apiName": "string",
"id": "string",
"verb": "string",
"actionGroupInput": {
"<parameter>
": {
"value": "string"
},
...
}
},
"agentKnowledgeBase": {
"knowledgeBaseId": "string",
"id": "string",
"searchQuery": {
"value": "string"
}
},
"agentFinalResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{"sourceId": "string"}]
}]
}
},
}
},
"knowledgeBaseResponseGenerationParsedResponse": {
"generatedResponse": {
"generatedResponseParts": [
{
"text": "string",
"references": [
{"sourceId": "string"},
...
]
}
]
}
},
"postProcessingParsedResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{
"sourceId": "string"
}]
}]
}
}
}
Contoh respons dari agen yang menjalankan tindakan selanjutnya secara paralel
Pilih tab yang sesuai dengan apakah Anda mendefinisikan grup tindakan dengan OpenAPI skema atau dengan rincian fungsi:
catatan
MessageVersion 2.0
Ini menunjukkan bahwa agen menjalankan tindakan yang direkomendasikan berikutnya secara paralel
- OpenAPI schema
-
{ "messageVersion": "2.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocations": [ { "actionGroupName": "string", "apiName": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } } ], "agentKnowledgeBases": [ { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } } ], "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } } - Function details
-
{ "messageVersion": "2.0", "promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION", "preProcessingParsedResponse": { "isValidInput": "boolean", "rationale": "string" }, "orchestrationParsedResponse": { "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string" }, "actionGroupInvocations": [ { "actionGroupName": "string", "functionName": "string", "actionGroupInput": { "
<parameter>
"": { "value": "string" }, ... } } ], "agentKnowledgeBases": [ { "knowledgeBaseId": "string", "searchQuery": { "value": "string" } } ], "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{"sourceId": "string"}] }] } }, } }, "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] } ] } }, "postProcessingParsedResponse": { "responseText": "string", "citations": { "generatedResponseParts": [{ "text": "string", "references": [{ "sourceId": "string" }] }] } } }
{
"messageVersion": "2.0",
"promptType": "ORCHESTRATION | PRE_PROCESSING | POST_PROCESSING | KNOWLEDGE_BASE_RESPONSE_GENERATION",
"preProcessingParsedResponse": {
"isValidInput": "boolean",
"rationale": "string"
},
"orchestrationParsedResponse": {
"rationale": "string",
"parsingErrorDetails": {
"repromptResponse": "string"
},
"responseDetails": {
"invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER",
"agentAskUser": {
"responseText": "string"
},
"actionGroupInvocations": [
{
"actionGroupName": "string",
"apiName": "string",
"verb": "string",
"actionGroupInput": {
"<parameter>
": {
"value": "string"
},
...
}
}
],
"agentKnowledgeBases": [
{
"knowledgeBaseId": "string",
"searchQuery": {
"value": "string"
}
}
],
"agentFinalResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{"sourceId": "string"}]
}]
}
},
}
},
"knowledgeBaseResponseGenerationParsedResponse": {
"generatedResponse": {
"generatedResponseParts": [
{
"text": "string",
"references": [
{"sourceId": "string"},
...
]
}
]
}
},
"postProcessingParsedResponse": {
"responseText": "string",
"citations": {
"generatedResponseParts": [{
"text": "string",
"references": [{
"sourceId": "string"
}]
}]
}
}
}
Daftar berikut menjelaskan bidang respons Lambda:
-
messageVersion
— Versi pesan yang mengidentifikasi format data peristiwa yang masuk ke fungsi Lambda dan format respons yang diharapkan dari fungsi Lambda. -
promptType
— Jenis prompt dari belokan saat ini. -
preProcessingParsedResponse
— Respons yang diuraikan untuk tipePRE_PROCESSING
prompt. -
orchestrationParsedResponse
— Respons yang diuraikan untuk tipeORCHESTRATION
prompt. Lihat di bawah untuk lebih jelasnya. -
knowledgeBaseResponseGenerationParsedResponse
— Respons yang diuraikan untuk tipeKNOWLEDGE_BASE_RESPONSE_GENERATION
prompt. -
postProcessingParsedResponse
— Respons yang diuraikan untuk tipePOST_PROCESSING
prompt.
Untuk detail selengkapnya tentang tanggapan yang diuraikan untuk empat templat prompt, lihat tab berikut.
- preProcessingParsedResponse
-
{ "isValidInput": "boolean", "rationale": "string" }
preProcessingParsedResponse
Berisi bidang-bidang berikut.-
isValidInput
- Menentukan apakah input pengguna valid atau tidak. Anda dapat menentukan fungsi untuk menentukan bagaimana mengkarakterisasi validitas input pengguna. -
rationale
— Alasan untuk kategorisasi input pengguna. Alasan ini disediakan oleh model dalam respons mentah, fungsi Lambda menguraikannya, dan agen menyajikannya dalam jejak untuk pra-pemrosesan.
-
- orchestrationResponse
-
Format
orchestrationResponse
tergantung pada apakah Anda mendefinisikan grup tindakan dengan OpenAPI skema atau rincian fungsi:-
Jika Anda mendefinisikan grup tindakan dengan OpenAPI skema, respon harus dalam format berikut:
{ "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "apiName": "string", "id": "string", "verb": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } } -
Jika Anda mendefinisikan grup tindakan dengan detail fungsi, respons harus dalam format berikut:
{ "rationale": "string", "parsingErrorDetails": { "repromptResponse": "string" }, "responseDetails": { "invocationType": "ACTION_GROUP | KNOWLEDGE_BASE | FINISH | ASK_USER", "agentAskUser": { "responseText": "string", "id": "string" }, "actionGroupInvocation": { "actionGroupName": "string", "functionName": "string", "id": "string", "actionGroupInput": { "
<parameter>
": { "value": "string" }, ... } }, "agentKnowledgeBase": { "knowledgeBaseId": "string", "id": "string", "searchQuery": { "value": "string" } }, "agentFinalResponse": { "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ {"sourceId": "string"}, ... ] }, ... ] } }, } }
orchestrationParsedResponse
Berisi bidang-bidang berikut:-
rationale
— Alasan untuk apa yang harus dilakukan selanjutnya, berdasarkan output model pondasi. Anda dapat menentukan fungsi untuk mengurai dari output model. -
parsingErrorDetails
— BerisirepromptResponse
, yang merupakan pesan untuk meminta ulang model untuk memperbarui respons mentahnya ketika respons model tidak dapat diuraikan. Anda dapat menentukan fungsi untuk memanipulasi cara mem-reprompt model. -
responseDetails
— Berisi rincian tentang cara menangani output dari model pondasi. BerisiinvocationType
, yang merupakan langkah selanjutnya untuk diambil agen, dan bidang kedua yang harus cocok denganinvocationType
. Objek berikut dimungkinkan.-
agentAskUser
— Kompatibel dengan jenisASK_USER
pemanggilan. Jenis pemanggilan ini mengakhiri langkah orkestrasi. BerisiresponseText
untuk meminta pengguna untuk informasi lebih lanjut. Anda dapat menentukan fungsi Anda untuk memanipulasi bidang ini. -
actionGroupInvocation
— Kompatibel dengan jenisACTION_GROUP
pemanggilan. Anda dapat menentukan fungsi Lambda Anda untuk menentukan grup tindakan yang akan dipanggil dan parameter untuk diteruskan. Berisi bidang-bidang berikut:-
actionGroupName
— Kelompok aksi untuk memanggil. -
Bidang berikut diperlukan jika Anda mendefinisikan grup tindakan dengan OpenAPI skema:
-
apiName
— Nama API operasi yang akan dipanggil dalam kelompok tindakan. -
verb
— Metode API operasi yang akan digunakan.
-
-
Bidang berikut diperlukan jika Anda mendefinisikan grup tindakan dengan detail fungsi:
-
functionName
— Nama fungsi yang akan dipanggil dalam grup tindakan.
-
-
actionGroupInput
— Berisi parameter untuk ditentukan dalam permintaan API operasi.
-
-
agentKnowledgeBase
— Kompatibel dengan jenisKNOWLEDGE_BASE
pemanggilan. Anda dapat menentukan fungsi Anda untuk menentukan cara menanyakan basis pengetahuan. Berisi bidang-bidang berikut:-
knowledgeBaseId
— Pengidentifikasi unik dari basis pengetahuan. -
searchQuery
— Berisi kueri untuk dikirim ke basis pengetahuan divalue
lapangan.
-
-
agentFinalResponse
— Kompatibel dengan jenisFINISH
pemanggilan. Jenis pemanggilan ini mengakhiri langkah orkestrasi. Berisi respons terhadap pengguna diresponseText
bidang dan kutipan untuk respons dicitations
objek.
-
-
- knowledgeBaseResponseGenerationParsedResponse
-
{ "generatedResponse": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }
knowledgeBaseResponseGenerationParsedResponse
BerisigeneratedResponse
dari query basis pengetahuan dan referensi untuk sumber data. - postProcessingParsedResponse
-
{ "responseText": "string", "citations": { "generatedResponseParts": [ { "text": "string", "references": [ { "sourceId": "string" }, ... ] }, ... ] } }
postProcessingParsedResponse
Berisi bidang-bidang berikut:-
responseText
— Respons untuk kembali ke pengguna akhir. Anda dapat menentukan fungsi untuk memformat respons. -
citations
— Berisi daftar kutipan untuk tanggapan. Setiap kutipan menunjukkan teks yang dikutip dan referensinya.
-
{
"isValidInput": "boolean",
"rationale": "string"
}
preProcessingParsedResponse
Berisi bidang-bidang berikut.
-
isValidInput
- Menentukan apakah input pengguna valid atau tidak. Anda dapat menentukan fungsi untuk menentukan bagaimana mengkarakterisasi validitas input pengguna. -
rationale
— Alasan untuk kategorisasi input pengguna. Alasan ini disediakan oleh model dalam respons mentah, fungsi Lambda menguraikannya, dan agen menyajikannya dalam jejak untuk pra-pemrosesan.
Contoh Parser Lambda
Untuk melihat contoh parser Lambda fungsi input peristiwa dan tanggapan, pilih dari tab berikut.
- Pre-processing
-
Contoh acara masukan
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "PRE_PROCESSING" }
Contoh respon
{ "promptType": "PRE_PROCESSING", "preProcessingParsedResponse": { "rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n", "isValidInput": false } }
- Orchestration
-
Contoh acara masukan
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID23456::Search function.\\n\\n</scratchpad>\\n\\n<function_call>GET::x_amz_knowledgebase_KBID123456::Search(searchQuery=\"What is the phone number I can call?\)", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "ORCHESTRATION" }
Contoh respon
{ "promptType": "ORCHESTRATION", "orchestrationParsedResponse": { "rationale": "To answer this question, I will:\\n\\n1. Call the GET::x_amz_knowledgebase_KBID123456::Search function to search for a phone number to call Farmers.\\n\\nI have checked that I have access to the GET::x_amz_knowledgebase_KBID123456::Search function.", "responseDetails": { "invocationType": "KNOWLEDGE_BASE", "agentKnowledgeBase": { "searchQuery": { "value": "What is the phone number I can call?" }, "knowledgeBaseId": "KBID123456" } } } }
- Knowledge base response generation
-
Contoh acara masukan
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "{\"completion\":\" <answer>\\\\n<answer_part>\\\\n<text>\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n</text>\\\\n<sources>\\\\n<source>1234567-1234-1234-1234-123456789abc</source>\\\\n<source>2345678-2345-2345-2345-23456789abcd</source>\\\\n<source>3456789-3456-3456-3456-3456789abcde</source>\\\\n</sources>\\\\n</answer_part>\\\\n</answer>\",\"stop_reason\":\"stop_sequence\",\"stop\":\"\\\\n\\\\nHuman:\"}", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION" }
Contoh respon
{ "promptType": "KNOWLEDGE_BASE_RESPONSE_GENERATION", "knowledgeBaseResponseGenerationParsedResponse": { "generatedResponse": { "generatedResponseParts": [ { "text": "\\\\nThe search results contain information about different types of insurance benefits, including personal injury protection (PIP), medical payments coverage, and lost wages coverage. PIP typically covers reasonable medical expenses for injuries caused by an accident, as well as income continuation, child care, loss of services, and funerals. Medical payments coverage provides payment for medical treatment resulting from a car accident. Who pays lost wages due to injuries depends on the laws in your state and the coverage purchased.\\\\n", "references": [ {"sourceId": "1234567-1234-1234-1234-123456789abc"}, {"sourceId": "2345678-2345-2345-2345-23456789abcd"}, {"sourceId": "3456789-3456-3456-3456-3456789abcde"} ] } ] } } }
- Post-processing
-
Contoh acara masukan
{ "agent": { "alias": "TSTALIASID", "id": "AGENTID123", "name": "InsuranceAgent", "version": "DRAFT" }, "invokeModelRawResponse": "<final_response>\\nBased on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 1234567-1234-1234-1234-123456789abc, 2345678-2345-2345-2345-23456789abcd). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions.\\n</final_response>", "messageVersion": "1.0", "overrideType": "OUTPUT_PARSER", "promptType": "POST_PROCESSING" }
Contoh respon
{ "promptType": "POST_PROCESSING", "postProcessingParsedResponse": { "responseText": "Based on your request, I searched our insurance benefit information database for details. The search results indicate that insurance policies may cover different types of benefits, depending on the policy and state laws. Specifically, the results discussed personal injury protection (PIP) coverage, which typically covers medical expenses for insured individuals injured in an accident (cited sources: 24c62d8c-3e39-4ca1-9470-a91d641fe050, 197815ef-8798-4cb1-8aa5-35f5d6b28365). PIP may pay for costs like medical care, lost income replacement, childcare expenses, and funeral costs. Medical payments coverage was also mentioned as another option that similarly covers medical treatment costs for the policyholder and others injured in a vehicle accident involving the insured vehicle. The search results further noted that whether lost wages are covered depends on the state and coverage purchased. Please let me know if you need any clarification or have additional questions." } }
- Memory summarization
-
Contoh acara masukan
{ "messageVersion": "1.0", "promptType": "MEMORY_SUMMARIZATION", "invokeModelRawResponse": "<summary> <topic name="user goals">User initiated the conversation with a greeting.</topic> </summary>" }
Contoh respon
{"topicwiseSummaries": [ { "topic": "TopicName1", "summary": "My Topic 1 Summary" } ... ] }
Contoh acara masukan
{
"agent": {
"alias": "TSTALIASID",
"id": "AGENTID123",
"name": "InsuranceAgent",
"version": "DRAFT"
},
"invokeModelRawResponse": " <thinking>\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n</thinking>\n\n<category>B</category>",
"messageVersion": "1.0",
"overrideType": "OUTPUT_PARSER",
"promptType": "PRE_PROCESSING"
}
Contoh respon
{
"promptType": "PRE_PROCESSING",
"preProcessingParsedResponse": {
"rationale": "\nThe user is asking about the instructions provided to the function calling agent. This input is trying to gather information about what functions/API's or instructions our function calling agent has access to. Based on the categories provided, this input belongs in Category B.\n",
"isValidInput": false
}
}
Untuk melihat contoh fungsi Lambda parser, perluas bagian untuk contoh template prompt yang ingin Anda lihat. lambda_handler
Fungsi mengembalikan respon yang diuraikan ke agen.
Contoh berikut menunjukkan fungsi Lambda parser pra-pemrosesan ditulis Python.
import json
import re
import logging
PRE_PROCESSING_RATIONALE_REGEX = "<thinking>(.*?)</thinking>"
PREPROCESSING_CATEGORY_REGEX = "<category>(.*?)</category>"
PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING"
PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL)
PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt
def lambda_handler(event, context):
print("Lambda input: " + str(event))
logger.info("Lambda input: " + str(event))
prompt_type = event["promptType"]
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == PREPROCESSING_PROMPT_TYPE:
return parse_pre_processing(model_response)
def parse_pre_processing(model_response):
category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response)
rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response)
category = next((match.group(1) for match in category_matches), None)
rationale = next((match.group(1) for match in rationale_matches), None)
return {
"promptType": "PRE_PROCESSING",
"preProcessingParsedResponse": {
"rationale": rationale,
"isValidInput": get_is_valid_input(category)
}
}
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text
def get_is_valid_input(category):
if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E":
return True
return False
Pra-pemrosesan
Contoh berikut menunjukkan fungsi Lambda parser pra-pemrosesan ditulis Python.
import json
import re
import logging
PRE_PROCESSING_RATIONALE_REGEX = "<thinking>(.*?)</thinking>"
PREPROCESSING_CATEGORY_REGEX = "<category>(.*?)</category>"
PREPROCESSING_PROMPT_TYPE = "PRE_PROCESSING"
PRE_PROCESSING_RATIONALE_PATTERN = re.compile(PRE_PROCESSING_RATIONALE_REGEX, re.DOTALL)
PREPROCESSING_CATEGORY_PATTERN = re.compile(PREPROCESSING_CATEGORY_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PreProcessing prompt
def lambda_handler(event, context):
print("Lambda input: " + str(event))
logger.info("Lambda input: " + str(event))
prompt_type = event["promptType"]
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == PREPROCESSING_PROMPT_TYPE:
return parse_pre_processing(model_response)
def parse_pre_processing(model_response):
category_matches = re.finditer(PREPROCESSING_CATEGORY_PATTERN, model_response)
rationale_matches = re.finditer(PRE_PROCESSING_RATIONALE_PATTERN, model_response)
category = next((match.group(1) for match in category_matches), None)
rationale = next((match.group(1) for match in rationale_matches), None)
return {
"promptType": "PRE_PROCESSING",
"preProcessingParsedResponse": {
"rationale": rationale,
"isValidInput": get_is_valid_input(category)
}
}
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text
def get_is_valid_input(category):
if category is not None and category.strip().upper() == "D" or category.strip().upper() == "E":
return True
return False
Contoh berikut menunjukkan parser orkestrasi fungsi Lambda ditulis dalam Python.
Kode contoh berbeda tergantung pada apakah grup tindakan Anda didefinisikan dengan OpenAPI skema atau dengan rincian fungsi:
-
Untuk melihat contoh untuk grup tindakan yang didefinisikan dengan OpenAPI skema, pilih tab yang sesuai dengan model yang ingin Anda lihat contohnya.
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
-
Untuk melihat contoh grup tindakan yang ditentukan dengan detail fungsi, pilih tab yang sesuai dengan model yang ingin Anda lihat contohnya.
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
Orkestrasi
Contoh berikut menunjukkan parser orkestrasi fungsi Lambda ditulis dalam Python.
Kode contoh berbeda tergantung pada apakah grup tindakan Anda didefinisikan dengan OpenAPI skema atau dengan rincian fungsi:
-
Untuk melihat contoh untuk grup tindakan yang didefinisikan dengan OpenAPI skema, pilih tab yang sesuai dengan model yang ingin Anda lihat contohnya.
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) verb, resource_name, function = match.group(1), match.group(2), match.group(3) parameters = {} for arg in match.group(4).split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
-
Untuk melihat contoh grup tindakan yang ditentukan dengan detail fungsi, pilih tab yang sesuai dengan model yang ingin Anda lihat contohnya.
- Anthropic Claude 2.0
-
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 2.1
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3
-
import logging import re import xml.etree.ElementTree as ET RATIONALE_REGEX_LIST = [ "(.*?)(<function_calls>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_calls>" ASK_USER_FUNCTION_CALL_REGEX = r"<tool_name>user::askuser</tool_name>" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_TOOL_NAME_REGEX = r"<tool_name>((.|\n)*?)</tool_name>" ASK_USER_TOOL_NAME_PATTERN = re.compile(ASK_USER_TOOL_NAME_REGEX, re.DOTALL) TOOL_PARAMETERS_REGEX = r"<parameters>((.|\n)*?)</parameters>" TOOL_PARAMETERS_PATTERN = re.compile(TOOL_PARAMETERS_REGEX, re.DOTALL) ASK_USER_TOOL_PARAMETER_REGEX = r"<question>((.|\n)*?)</question>" ASK_USER_TOOL_PARAMETER_PATTERN = re.compile(ASK_USER_TOOL_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX = r"(?<=<function_calls>)(.*)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user::askuser function call. Please try again with the correct argument added." ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <invoke> <tool_name>user::askuser</tool_name><parameters><question>$QUESTION</question></parameters></invoke>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls must be: <invoke> <tool_name>$TOOL_NAME</tool_name> <parameters> <$PARAMETER_NAME>$PARAMETER_VALUE</$PARAMETER_NAME>...</parameters></invoke>." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next( (pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_llm_response) params = parameters_matches.group(1).strip() ask_user_question_matcher = ASK_USER_TOOL_PARAMETER_PATTERN.search(params) if ask_user_question_matcher: ask_user_question = ask_user_question_matcher.group(1) return ask_user_question raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX, sanitized_response) if not match: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) tool_name_matches = ASK_USER_TOOL_NAME_PATTERN.search(sanitized_response) tool_name = tool_name_matches.group(1) parameters_matches = TOOL_PARAMETERS_PATTERN.search(sanitized_response) params = parameters_matches.group(1).strip() action_split = tool_name.split('::') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[0].strip() function = action_split[1].strip() xml_tree = ET.ElementTree(ET.fromstring("<parameters>{}</parameters>".format(params))) parameters = {} for elem in xml_tree.iter(): if elem.text: parameters[elem.tag] = {'value': elem.text.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
- Anthropic Claude 3.5
-
import json import logging import re from collections import defaultdict RATIONALE_VALUE_REGEX_LIST = [ "<thinking>(.*?)(</thinking>)", "(.*?)(</thinking>)", "(<thinking>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" ASK_USER = "user__askuser" KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the parameter 'question' for user__askuser function call. Please try again with the correct argument added." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = "The tool name format is incorrect. The format for the tool name must be: 'httpVerb__actionGroupName__apiName." logger = logging.getLogger() # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.setLevel("INFO") logger.info("Lambda input: " + str(event)) # Sanitize LLM response response = load_response(event['invokeModelRawResponse']) stop_reason = response["stop_reason"] content = response["content"] content_by_type = get_content_by_type(content) # Parse LLM response for any rationale rationale = parse_rationale(content_by_type) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } match stop_reason: case 'tool_use': # Check if there is an ask user try: ask_user = parse_ask_user(content_by_type) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user, 'id': content_by_type['tool_use'][0]['id'] }, } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(content_by_type, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case 'end_turn' | 'stop_sequence': # Check if there is a final answer try: if content_by_type["text"]: text_contents = content_by_type["text"] for text_content in text_contents: final_answer, generated_response_parts = parse_answer(text_content) if final_answer: parsed_response['orchestrationParsedResponse'][ 'responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response case _: addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response def load_response(text): raw_text = r'{}'.format(text) json_text = json.loads(raw_text) return json_text def get_content_by_type(content): content_by_type = defaultdict(list) for content_value in content: content_by_type[content_value["type"]].append(content_value) return content_by_type def parse_rationale(content_by_type): if "text" in content_by_type: rationale = content_by_type["text"][0]["text"] if rationale is not None: rationale_matcher = next( (pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() return rationale return None def parse_answer(response): if has_generated_response(response["text"].strip()): return parse_generated_response(response) answer_match = ANSWER_PATTERN.search(response["text"].strip()) if answer_match: return answer_match.group(0).strip(), None return None, None def parse_generated_response(response): results = [] for match in ANSWER_PART_PATTERN.finditer(response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(content_by_type): try: if content_by_type["tool_use"][0]["name"] == ASK_USER: ask_user_question = content_by_type["tool_use"][0]["input"]["question"] if not ask_user_question: raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) return ask_user_question except ValueError as ex: raise ex return None def parse_function_call(content_by_type, parsed_response): try: content = content_by_type["tool_use"][0] tool_name = content["name"] action_split = tool_name.split('__') schema_type = 'FUNCTION' if len(action_split) == 2 else 'API' if schema_type == 'API': verb = action_split[0].strip() resource_name = action_split[1].strip() function = action_split[2].strip() else: resource_name = action_split[1].strip() function = action_split[2].strip() except ValueError as ex: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) parameters = {} for param, value in content["input"].items(): parameters[param] = {'value': value} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails'][ 'agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace( KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, ''), 'id': content["id"] } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails'][ 'invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails'][ 'actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters, "id": content["id"] } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
import json import re import logging RATIONALE_REGEX_LIST = [ "(.*?)(<function_call>)", "(.*?)(<answer>)" ] RATIONALE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_REGEX_LIST] RATIONALE_VALUE_REGEX_LIST = [ "<scratchpad>(.*?)(</scratchpad>)", "(.*?)(</scratchpad>)", "(<scratchpad>)(.*?)" ] RATIONALE_VALUE_PATTERNS = [re.compile(regex, re.DOTALL) for regex in RATIONALE_VALUE_REGEX_LIST] ANSWER_REGEX = r"(?<=<answer>)(.*)" ANSWER_PATTERN = re.compile(ANSWER_REGEX, re.DOTALL) ANSWER_TAG = "<answer>" FUNCTION_CALL_TAG = "<function_call>" ASK_USER_FUNCTION_CALL_REGEX = r"(<function_call>user::askuser)(.*)\)" ASK_USER_FUNCTION_CALL_PATTERN = re.compile(ASK_USER_FUNCTION_CALL_REGEX, re.DOTALL) ASK_USER_FUNCTION_PARAMETER_REGEX = r"(?<=askuser=\")(.*?)\"" ASK_USER_FUNCTION_PARAMETER_PATTERN = re.compile(ASK_USER_FUNCTION_PARAMETER_REGEX, re.DOTALL) KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX = "x_amz_knowledgebase_" FUNCTION_CALL_REGEX_API_SCHEMA = r"<function_call>(\w+)::(\w+)::(.+)\((.+)\)" FUNCTION_CALL_REGEX_FUNCTION_SCHEMA = r"<function_call>(\w+)::(.+)\((.+)\)" ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>" ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>" ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>" ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL) ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL) ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL) # You can provide messages to reprompt the LLM in case the LLM output is not in the expected format MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE = "Missing the argument askuser for user::askuser function call. Please try again with the correct argument added" ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE = "The function call format is incorrect. The format for function calls to the askuser function must be: <function_call>user::askuser(askuser=\"$ASK_USER_INPUT\")</function_call>." FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE = 'The function call format is incorrect. The format for function calls must be: <function_call>$FUNCTION_NAME($FUNCTION_ARGUMENT_NAME=""$FUNCTION_ARGUMENT_NAME"")</function_call>.' logger = logging.getLogger() logger.setLevel("INFO") # This parser lambda is an example of how to parse the LLM output for the default orchestration prompt def lambda_handler(event, context): logger.info("Lambda input: " + str(event)) # Sanitize LLM response sanitized_response = sanitize_response(event['invokeModelRawResponse']) # Parse LLM response for any rationale rationale = parse_rationale(sanitized_response) # Construct response fields common to all invocation types parsed_response = { 'promptType': "ORCHESTRATION", 'orchestrationParsedResponse': { 'rationale': rationale } } # Check if there is a final answer try: final_answer, generated_response_parts = parse_answer(sanitized_response) except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response if final_answer: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'FINISH', 'agentFinalResponse': { 'responseText': final_answer } } if generated_response_parts: parsed_response['orchestrationParsedResponse']['responseDetails']['agentFinalResponse']['citations'] = { 'generatedResponseParts': generated_response_parts } logger.info("Final answer parsed response: " + str(parsed_response)) return parsed_response # Check if there is an ask user try: ask_user = parse_ask_user(sanitized_response) if ask_user: parsed_response['orchestrationParsedResponse']['responseDetails'] = { 'invocationType': 'ASK_USER', 'agentAskUser': { 'responseText': ask_user } } logger.info("Ask user parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response # Check if there is an agent action try: parsed_response = parse_function_call(sanitized_response, parsed_response) logger.info("Function call parsed response: " + str(parsed_response)) return parsed_response except ValueError as e: addRepromptResponse(parsed_response, e) return parsed_response addRepromptResponse(parsed_response, 'Failed to parse the LLM output') logger.info(parsed_response) return parsed_response raise Exception("unrecognized prompt type") def sanitize_response(text): pattern = r"(\\n*)" text = re.sub(pattern, r"\n", text) return text def parse_rationale(sanitized_response): # Checks for strings that are not required for orchestration rationale_matcher = next((pattern.search(sanitized_response) for pattern in RATIONALE_PATTERNS if pattern.search(sanitized_response)), None) if rationale_matcher: rationale = rationale_matcher.group(1).strip() # Check if there is a formatted rationale that we can parse from the string rationale_value_matcher = next((pattern.search(rationale) for pattern in RATIONALE_VALUE_PATTERNS if pattern.search(rationale)), None) if rationale_value_matcher: return rationale_value_matcher.group(1).strip() return rationale return None def parse_answer(sanitized_llm_response): if has_generated_response(sanitized_llm_response): return parse_generated_response(sanitized_llm_response) answer_match = ANSWER_PATTERN.search(sanitized_llm_response) if answer_match and is_answer(sanitized_llm_response): return answer_match.group(0).strip(), None return None, None def is_answer(llm_response): return llm_response.rfind(ANSWER_TAG) > llm_response.rfind(FUNCTION_CALL_TAG) def parse_generated_response(sanitized_llm_response): results = [] for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response): part = match.group(1).strip() text_match = ANSWER_TEXT_PART_PATTERN.search(part) if not text_match: raise ValueError("Could not parse generated response") text = text_match.group(1).strip() references = parse_references(sanitized_llm_response, part) results.append((text, references)) final_response = " ".join([r[0] for r in results]) generated_response_parts = [] for text, references in results: generatedResponsePart = { 'text': text, 'references': references } generated_response_parts.append(generatedResponsePart) return final_response, generated_response_parts def has_generated_response(raw_response): return ANSWER_PART_PATTERN.search(raw_response) is not None def parse_references(raw_response, answer_part): references = [] for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part): reference = match.group(1).strip() references.append({'sourceId': reference}) return references def parse_ask_user(sanitized_llm_response): ask_user_matcher = ASK_USER_FUNCTION_CALL_PATTERN.search(sanitized_llm_response) if ask_user_matcher: try: ask_user = ask_user_matcher.group(2).strip() ask_user_question_matcher = ASK_USER_FUNCTION_PARAMETER_PATTERN.search(ask_user) if ask_user_question_matcher: return ask_user_question_matcher.group(1).strip() raise ValueError(MISSING_API_INPUT_FOR_USER_REPROMPT_MESSAGE) except ValueError as ex: raise ex except Exception as ex: raise Exception(ASK_USER_FUNCTION_CALL_STRUCTURE_REMPROMPT_MESSAGE) return None def parse_function_call(sanitized_response, parsed_response): match = re.search(FUNCTION_CALL_REGEX_API_SCHEMA, sanitized_response) match_function_schema = re.search(FUNCTION_CALL_REGEX_FUNCTION_SCHEMA, sanitized_response) if not match and not match_function_schema: raise ValueError(FUNCTION_CALL_STRUCTURE_REPROMPT_MESSAGE) if match: schema_type = 'API' verb, resource_name, function, param_arg = match.group(1), match.group(2), match.group(3), match.group(4) else: schema_type = 'FUNCTION' resource_name, function, param_arg = match_function_schema.group(1), match_function_schema.group(2), match_function_schema.group(3) parameters = {} for arg in param_arg.split(","): key, value = arg.split("=") parameters[key.strip()] = {'value': value.strip('" ')} parsed_response['orchestrationParsedResponse']['responseDetails'] = {} # Function calls can either invoke an action group or a knowledge base. # Mapping to the correct variable names accordingly if schema_type == 'API' and resource_name.lower().startswith(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX): parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'KNOWLEDGE_BASE' parsed_response['orchestrationParsedResponse']['responseDetails']['agentKnowledgeBase'] = { 'searchQuery': parameters['searchQuery'], 'knowledgeBaseId': resource_name.replace(KNOWLEDGE_STORE_SEARCH_ACTION_PREFIX, '') } return parsed_response parsed_response['orchestrationParsedResponse']['responseDetails']['invocationType'] = 'ACTION_GROUP' if schema_type == 'API': parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "verb": verb, "actionGroupName": resource_name, "apiName": function, "actionGroupInput": parameters } else: parsed_response['orchestrationParsedResponse']['responseDetails']['actionGroupInvocation'] = { "actionGroupName": resource_name, "functionName": function, "actionGroupInput": parameters } return parsed_response def addRepromptResponse(parsed_response, error): error_message = str(error) logger.warn(error_message) parsed_response['orchestrationParsedResponse']['parsingErrorDetails'] = { 'repromptResponse': error_message }
Contoh berikut menunjukkan fungsi Lambda parser generasi respons basis pengetahuan yang ditulis Python.
import json
import re
import logging
ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION',
'knowledgeBaseResponseGenerationParsedResponse': {
'generatedResponse': parse_generated_response(raw_response)
}
}
logger.info(parsed_response)
return parsed_response
def parse_generated_response(sanitized_llm_response):
results = []
for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
part = match.group(1).strip()
text_match = ANSWER_TEXT_PART_PATTERN.search(part)
if not text_match:
raise ValueError("Could not parse generated response")
text = text_match.group(1).strip()
references = parse_references(sanitized_llm_response, part)
results.append((text, references))
generated_response_parts = []
for text, references in results:
generatedResponsePart = {
'text': text,
'references': references
}
generated_response_parts.append(generatedResponsePart)
return {
'generatedResponseParts': generated_response_parts
}
def parse_references(raw_response, answer_part):
references = []
for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
reference = match.group(1).strip()
references.append({'sourceId': reference})
return references
Generasi respons basis pengetahuan
Contoh berikut menunjukkan fungsi Lambda parser generasi respons basis pengetahuan yang ditulis Python.
import json
import re
import logging
ANSWER_PART_REGEX = "<answer_part\\s?>(.+?)</answer_part\\s?>"
ANSWER_TEXT_PART_REGEX = "<text\\s?>(.+?)</text\\s?>"
ANSWER_REFERENCE_PART_REGEX = "<source\\s?>(.+?)</source\\s?>"
ANSWER_PART_PATTERN = re.compile(ANSWER_PART_REGEX, re.DOTALL)
ANSWER_TEXT_PART_PATTERN = re.compile(ANSWER_TEXT_PART_REGEX, re.DOTALL)
ANSWER_REFERENCE_PART_PATTERN = re.compile(ANSWER_REFERENCE_PART_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default KB response generation prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'KNOWLEDGE_BASE_RESPONSE_GENERATION',
'knowledgeBaseResponseGenerationParsedResponse': {
'generatedResponse': parse_generated_response(raw_response)
}
}
logger.info(parsed_response)
return parsed_response
def parse_generated_response(sanitized_llm_response):
results = []
for match in ANSWER_PART_PATTERN.finditer(sanitized_llm_response):
part = match.group(1).strip()
text_match = ANSWER_TEXT_PART_PATTERN.search(part)
if not text_match:
raise ValueError("Could not parse generated response")
text = text_match.group(1).strip()
references = parse_references(sanitized_llm_response, part)
results.append((text, references))
generated_response_parts = []
for text, references in results:
generatedResponsePart = {
'text': text,
'references': references
}
generated_response_parts.append(generatedResponsePart)
return {
'generatedResponseParts': generated_response_parts
}
def parse_references(raw_response, answer_part):
references = []
for match in ANSWER_REFERENCE_PART_PATTERN.finditer(answer_part):
reference = match.group(1).strip()
references.append({'sourceId': reference})
return references
Contoh berikut menunjukkan fungsi Lambda parser pasca-pemrosesan ditulis Python.
import json
import re
import logging
FINAL_RESPONSE_REGEX = r"<final_response>([\s\S]*?)</final_response>"
FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'POST_PROCESSING',
'postProcessingParsedResponse': {}
}
matcher = FINAL_RESPONSE_PATTERN.search(raw_response)
if not matcher:
raise Exception("Could not parse raw LLM output")
response_text = matcher.group(1).strip()
parsed_response['postProcessingParsedResponse']['responseText'] = response_text
logger.info(parsed_response)
return parsed_response
Pasca-pemrosesan
Contoh berikut menunjukkan fungsi Lambda parser pasca-pemrosesan ditulis Python.
import json
import re
import logging
FINAL_RESPONSE_REGEX = r"<final_response>([\s\S]*?)</final_response>"
FINAL_RESPONSE_PATTERN = re.compile(FINAL_RESPONSE_REGEX, re.DOTALL)
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default PostProcessing prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
raw_response = event['invokeModelRawResponse']
parsed_response = {
'promptType': 'POST_PROCESSING',
'postProcessingParsedResponse': {}
}
matcher = FINAL_RESPONSE_PATTERN.search(raw_response)
if not matcher:
raise Exception("Could not parse raw LLM output")
response_text = matcher.group(1).strip()
parsed_response['postProcessingParsedResponse']['responseText'] = response_text
logger.info(parsed_response)
return parsed_response
Contoh berikut menunjukkan fungsi parser ringkasan memori Lambda ditulis Python.
import re
import logging
SUMMARY_TAG_PATTERN = r'<summary>(.*?)</summary>'
TOPIC_TAG_PATTERN = r'<topic name="(.+?)"\s*>(.+?)</topic>'
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default LTM SUmmarization prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == "MEMORY_SUMMARIZATION":
return format_response(parse_llm_response(model_response), event["promptType"])
def format_response(topic_summaries, prompt_type):
return {
"promptType": prompt_type,
"memorySummarizationParsedResponse": {
"topicwiseSummaries": topic_summaries
}
}
def parse_llm_response(output: str):
# First extract content within summary tag
summary_match = re.search(SUMMARY_TAG_PATTERN, output, re.DOTALL)
if not summary_match:
raise Exception("Error while parsing summarizer model output, no summary tag found!")
summary_content = summary_match.group(1)
topic_summaries = parse_topic_wise_summaries(summary_content)
return topic_summaries
def parse_topic_wise_summaries(content):
summaries = []
# Then extract content within topic tag
for match in re.finditer(TOPIC_TAG_PATTERN, content, re.DOTALL):
topic_name = match.group(1)
topic_summary = match.group(2).strip()
summaries.append({
'topic': topic_name,
'summary': topic_summary
})
if not summaries:
raise Exception("Error while parsing summarizer model output, no topics found!")
return summaries
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text
Ringkasan memori
Contoh berikut menunjukkan fungsi parser ringkasan memori Lambda ditulis Python.
import re
import logging
SUMMARY_TAG_PATTERN = r'<summary>(.*?)</summary>'
TOPIC_TAG_PATTERN = r'<topic name="(.+?)"\s*>(.+?)</topic>'
logger = logging.getLogger()
# This parser lambda is an example of how to parse the LLM output for the default LTM SUmmarization prompt
def lambda_handler(event, context):
logger.info("Lambda input: " + str(event))
# Sanitize LLM response
model_response = sanitize_response(event['invokeModelRawResponse'])
if event["promptType"] == "MEMORY_SUMMARIZATION":
return format_response(parse_llm_response(model_response), event["promptType"])
def format_response(topic_summaries, prompt_type):
return {
"promptType": prompt_type,
"memorySummarizationParsedResponse": {
"topicwiseSummaries": topic_summaries
}
}
def parse_llm_response(output: str):
# First extract content within summary tag
summary_match = re.search(SUMMARY_TAG_PATTERN, output, re.DOTALL)
if not summary_match:
raise Exception("Error while parsing summarizer model output, no summary tag found!")
summary_content = summary_match.group(1)
topic_summaries = parse_topic_wise_summaries(summary_content)
return topic_summaries
def parse_topic_wise_summaries(content):
summaries = []
# Then extract content within topic tag
for match in re.finditer(TOPIC_TAG_PATTERN, content, re.DOTALL):
topic_name = match.group(1)
topic_summary = match.group(2).strip()
summaries.append({
'topic': topic_name,
'summary': topic_summary
})
if not summaries:
raise Exception("Error while parsing summarizer model output, no topics found!")
return summaries
def sanitize_response(text):
pattern = r"(\\n*)"
text = re.sub(pattern, r"\n", text)
return text