Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Configurazione dei controlli di gestione degli errori per le sorgenti di eventi Kafka
Puoi configurare il modo in cui Lambda gestisce gli errori e i nuovi tentativi per le mappature delle sorgenti degli eventi Kafka. Queste configurazioni consentono di controllare il modo in cui Lambda elabora i record non riusciti e gestisce il comportamento dei tentativi.
Configurazioni di nuovi tentativi disponibili
Le seguenti configurazioni di riprova sono disponibili sia per Amazon MSK che per le sorgenti di eventi Kafka autogestite:
-
Numero massimo di tentativi: il numero massimo di tentativi Lambda quando la funzione restituisce un errore. Questo non conta il tentativo di invocazione iniziale. L'impostazione predefinita è -1 (infinito).
-
Età massima di registrazione: l'età massima di un record che Lambda invia alla tua funzione. L'impostazione predefinita è -1 (infinito).
-
Suddividi il batch in caso di errore: quando la funzione restituisce un errore, dividi il batch in due batch più piccoli e riprova ciascuno separatamente. Questo aiuta a isolare i record problematici.
-
Risposta batch parziale: consenti alla funzione di restituire informazioni sui record di un batch che non è riuscita a elaborare, in modo che Lambda possa riprovare solo i record non riusciti.
Configurazione dei controlli di gestione degli errori (console)
Puoi configurare il comportamento dei tentativi durante la creazione o l'aggiornamento di una mappatura della sorgente degli eventi Kafka nella console Lambda.
Per configurare il comportamento di ripetizione dei tentativi per una sorgente di eventi Kafka (console)
-
Aprire la pagina Funzioni
della console Lambda. -
Scegliete il nome della funzione.
-
Esegui una delle seguenti operazioni:
-
Per aggiungere un nuovo trigger Kafka, in Panoramica delle funzioni, scegli Aggiungi trigger.
-
Per modificare un trigger Kafka esistente, scegli il trigger, quindi scegli Modifica.
-
-
In Event poller configuration, seleziona la modalità provisioned per configurare i controlli di gestione degli errori:
-
Per Riprovare, inserisci il numero massimo di tentativi (0-10000 o -1 per un numero infinito).
-
Per Durata massima di registrazione, inserisci l'età massima in secondi (60-604800 o -1 per un numero infinito).
-
Per abilitare la suddivisione in batch quando si verificano errori, seleziona Dividi batch in caso di errore.
-
Per abilitare la risposta parziale in batch, selezionare ReportBatchItemFailures.
-
-
Scegliete Aggiungi o Salva.
Configurazione del comportamento dei tentativi ()AWS CLI
Utilizzate i seguenti AWS CLI comandi per configurare il comportamento dei tentativi di ripetizione per le mappature delle sorgenti degli eventi Kafka.
Creazione di una mappatura delle sorgenti degli eventi con configurazioni di nuovo tentativo
L'esempio seguente crea una mappatura delle sorgenti degli eventi Kafka autogestita con controlli di gestione degli errori:
aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Per le sorgenti di eventi Amazon MSK:
aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
Aggiornamento delle configurazioni dei tentativi
Usa il update-event-source-mapping comando per modificare le configurazioni dei tentativi per una mappatura dell'origine degli eventi esistente:
aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"
PartialBatchResponse
La risposta parziale in batch, nota anche come ReportBatchItemFailures, è una funzionalità chiave per la gestione degli errori nell'integrazione di Lambda con i sorgenti Kafka. Senza questa funzionalità, quando si verifica un errore in uno degli elementi di un batch, comporta la rielaborazione di tutti i messaggi in quel batch. Con la risposta batch parziale abilitata e implementata, il gestore restituisce gli identificatori solo per i messaggi non riusciti, permettendo a Lambda di riprovare solo quegli elementi specifici. Ciò fornisce un maggiore controllo sul modo in cui vengono elaborati i batch contenenti messaggi non riusciti.
Per segnalare gli errori relativi ai batch, utilizzerai questo schema JSON:
{ "batchItemFailures": [ { "itemIdentifier": { "topic-partition": "topic-partition_number", "offset": 100 } }, ... ] }
Importante
Se restituisci un codice JSON o null valido vuoto, la mappatura dell'origine dell'evento considererà un batch come elaborato correttamente. Qualsiasi topic-partition_number o offset non valido restituito che non era presente nell'evento richiamato verrà considerato un errore e l'intero batch verrà riprovato.
I seguenti esempi di codice mostrano come implementare una risposta batch parziale per le funzioni Lambda che ricevono eventi da sorgenti Kafka. La funzione riporta gli errori degli elementi batch nella risposta, segnalando a Lambda di riprovare tali messaggi in un secondo momento.
Ecco un'implementazione del gestore Python Lambda che mostra questo approccio:
import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "topic-partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass
Ecco una versione di Node.js:
const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "topic-partition": `${topic}-${partition}`, offset: Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };