本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
設定 Kafka 事件來源的錯誤處理控制項
您可以設定 Lambda 如何處理 Kafka 事件來源映射的錯誤和重試。這些組態可協助您控制 Lambda 如何處理失敗的記錄和管理重試行為。
可用的重試組態
下列重試組態適用於 Amazon MSK 和自我管理 Kafka 事件來源:
-
重試次數上限 – 當函數傳回錯誤時,Lambda 重試次數上限。這不會計入初始調用嘗試。預設值為 -1 (無限)。
-
記錄存留期上限 – Lambda 傳送給函數的記錄存留期上限。預設值為 -1 (無限)。
-
錯誤時分割批次 – 當函數傳回錯誤時,請將批次分割為兩個較小的批次,並個別重試。這有助於隔離有問題的記錄。
-
部分批次回應 – 允許函數傳回批次中哪些記錄處理失敗的資訊,因此 Lambda 只能重試失敗的記錄。
設定錯誤處理控制 (主控台)
您可以在 Lambda 主控台中建立或更新 Kafka 事件來源映射時設定重試行為。
設定 Kafka 事件來源的重試行為 (主控台)
-
開啟 Lambda 主控台中的函數頁面
。 -
選擇您的函數名稱。
-
執行以下任意一項:
-
若要新增新的 Kafka 觸發條件,請在函數概觀下,選擇新增觸發條件。
-
若要修改現有的 Kafka 觸發條件,請選擇觸發條件,然後選擇編輯。
-
-
在事件輪詢器組態下,選取佈建模式以設定錯誤處理控制項:
-
針對重試嘗試,輸入重試嘗試次數上限 (0-10000,或無限輸入 -1)。
-
針對記錄存留期上限,以秒為單位輸入存留期上限 (60-604800,無限輸入 -1)。
-
若要在發生錯誤時啟用批次分割,請選取錯誤時分割批次。
-
若要啟用部分批次回應,請選取 ReportBatchItemFailures。
-
-
選擇新增或儲存。
設定重試行為 (AWS CLI)
使用下列 AWS CLI 命令來設定 Kafka 事件來源映射的重試行為。
使用重試組態建立事件來源映射
下列範例會使用錯誤處理控制項建立自我管理的 Kafka 事件來源映射:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
對於 Amazon MSK 事件來源:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
更新重試組態
使用 update-event-source-mapping命令來修改現有事件來源映射的重試組態:
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
PartialBatchResponse
部分批次回應也稱為 ReportBatchItemFailures,是 Lambda 與 Kafka 來源整合時處理錯誤的重要功能。如果沒有此功能,當批次中其中一個項目發生錯誤時,會導致重新處理該批次中的所有訊息。啟用並實作部分批次回應後,處理常式只會傳回失敗訊息的識別符,讓 Lambda 只重試這些特定項目。這可讓您更好地控制包含失敗訊息的批次處理方式。
若要報告批次錯誤,您將使用此 JSON 結構描述:
{ "batchItemFailures": [ { "itemIdentifier": { "topic-partition": "topic-partition_number", "offset": 100 } }, ... ] }
重要
如果您傳回空的有效 JSON 或 null,事件來源映射會將批次視為已成功處理。傳回的任何無效 topic-partition_number 或位移若不存在於調用事件中,將視為失敗,並重試整個批次。
下列程式碼範例示範如何為從 Kafka 來源接收事件的 Lambda 函數實作部分批次回應。此函數會在回應中報告批次項目失敗,指示 Lambda 稍後重試這些訊息。
以下是顯示此方法的 Python Lambda 處理常式實作:
import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "topic-partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass
以下是 Node.js 版本:
const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "topic-partition": `${topic}-${partition}`, offset: Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };