Konfiguration der Fehlerbehandlungssteuerungen für Kafka-Ereignisquellen - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Konfiguration der Fehlerbehandlungssteuerungen für Kafka-Ereignisquellen

Sie können konfigurieren, wie Lambda Fehler und Wiederholungen für Ihre Kafka-Ereignisquellenzuordnungen behandelt. Mit diesen Konfigurationen können Sie steuern, wie Lambda fehlgeschlagene Datensätze verarbeitet und das Wiederholungsverhalten verwaltet.

Verfügbare Wiederholungskonfigurationen

Die folgenden Wiederholungskonfigurationen sind sowohl für Amazon MSK als auch für selbstverwaltete Kafka-Ereignisquellen verfügbar:

  • Maximale Wiederholungsversuche — Die maximale Anzahl von Wiederholungsversuchen von Lambda, wenn Ihre Funktion einen Fehler zurückgibt. Der erste Aufrufversuch wird dabei nicht mitgezählt. Die Standardeinstellung ist -1 (unendlich).

  • Maximales Datensatzalter — Das maximale Alter eines Datensatzes, den Lambda an Ihre Funktion sendet. Die Standardeinstellung ist -1 (unendlich).

  • Batch bei Fehler aufteilen — Wenn Ihre Funktion einen Fehler zurückgibt, teilen Sie den Batch in zwei kleinere Batches auf und versuchen Sie es jeweils einzeln erneut. Dies hilft, problematische Datensätze zu isolieren.

  • Partielle Batch-Antwort — Erlauben Sie Ihrer Funktion, Informationen darüber zurückzugeben, bei welchen Datensätzen in einem Batch die Verarbeitung fehlgeschlagen ist, sodass Lambda nur die fehlgeschlagenen Datensätze erneut versuchen kann.

Konfiguration von Steuerelementen zur Fehlerbehandlung (Konsole)

Sie können das Wiederholungsverhalten konfigurieren, wenn Sie eine Kafka-Ereignisquellenzuordnung in der Lambda-Konsole erstellen oder aktualisieren.

So konfigurieren Sie das Wiederholungsverhalten für eine Kafka-Ereignisquelle (Konsole)
  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie den Namen Ihrer Funktion.

  3. Führen Sie eine der folgenden Aktionen aus:

    • Um einen neuen Kafka-Trigger hinzuzufügen, wählen Sie unter Funktionsübersicht die Option Trigger hinzufügen aus.

    • Um einen vorhandenen Kafka-Trigger zu ändern, wählen Sie den Auslöser aus und klicken Sie dann auf Bearbeiten.

  4. Wählen Sie unter Konfiguration des Event-Pollers den Bereitstellungsmodus aus, um die Kontrollen zur Fehlerbehandlung zu konfigurieren:

    1. Geben Sie für Wiederholungsversuche die maximale Anzahl von Wiederholungsversuchen ein (0-10000 oder -1 für unendlich).

    2. Geben Sie für Maximales Datensatzalter das Höchstalter in Sekunden ein (60-604800 oder -1 für unendlich).

    3. Um die Stapelaufteilung bei Fehlern zu aktivieren, wählen Sie Batch bei Fehler aufteilen aus.

    4. Um die teilweise Batch-Antwort zu aktivieren, wählen Sie ReportBatchItemFailures.

  5. Wählen Sie Hinzufügen oder Speichern.

Konfiguration des Wiederholungsverhaltens ()AWS CLI

Verwenden Sie die folgenden AWS CLI Befehle, um das Wiederholungsverhalten für Ihre Kafka-Ereignisquellenzuordnungen zu konfigurieren.

Erstellen einer Ereignisquellenzuordnung mit Wiederholungskonfigurationen

Im folgenden Beispiel wird eine selbstverwaltete Kafka-Ereignisquellenzuordnung mit Steuerelementen zur Fehlerbehandlung erstellt:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics my-kafka-topic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

Für Amazon MSK-Ereignisquellen:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSMSKKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]' \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=1 \ --maximum-retry-attempts 3 \ --maximum-record-age-in-seconds 3600 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

Wiederholungskonfigurationen werden aktualisiert

Verwenden Sie den update-event-source-mapping Befehl, um Wiederholungskonfigurationen für eine bestehende Ereignisquellenzuordnung zu ändern:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --maximum-retry-attempts 5 \ --maximum-record-age-in-seconds 7200 \ --bisect-batch-on-function-error \ --function-response-types "ReportBatchItemFailures"

PartialBatchResponse

Die partielle Batch-Antwort, auch bekannt als ReportBatchItemFailures, ist eine wichtige Funktion für die Fehlerbehandlung bei der Integration von Lambda mit Kafka-Quellen. Ohne diese Funktion führt ein Fehler in einem der Elemente in einem Batch dazu, dass alle Nachrichten in diesem Batch erneut verarbeitet werden. Wenn die partielle Batch-Antwort aktiviert und implementiert ist, gibt der Handler nur Identifikatoren für die fehlgeschlagenen Nachrichten zurück, sodass Lambda nur diese spezifischen Elemente erneut versuchen kann. Dies ermöglicht eine bessere Kontrolle darüber, wie Batches mit fehlgeschlagenen Nachrichten verarbeitet werden.

Um Batchfehler zu melden, verwenden Sie dieses JSON-Schema:

{ "batchItemFailures": [ { "itemIdentifier": { "topic-partition": "topic-partition_number", "offset": 100 } }, ... ] }
Wichtig

Wenn Sie einen leeren, gültigen JSON-Code oder einen Nullwert zurückgeben, betrachtet die Zuordnung der Ereignisquelle einen Batch als erfolgreich verarbeitet. Jede zurückgegebene ungültige Topic-Partition_Number oder Offset, die im aufgerufenen Ereignis nicht enthalten waren, wird als Fehler behandelt, und der gesamte Batch wird erneut versucht.

Die folgenden Codebeispiele zeigen, wie eine partielle Batch-Antwort für Lambda-Funktionen implementiert wird, die Ereignisse aus Kafka-Quellen empfangen. Die Funktion meldet die Batch-Elementfehler in der Antwort und signalisiert Lambda, diese Nachrichten später erneut zu versuchen.

Hier ist eine Python-Lambda-Handler-Implementierung, die diesen Ansatz zeigt:

import base64 from typing import Any, Dict, List def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, List[Dict[str, Dict[str, Any]]]]: failures: List[Dict[str, Dict[str, Any]]] = [] records_dict = event.get("records", {}) for topic_partition, records_list in records_dict.items(): for record in records_list: topic = record.get("topic") partition = record.get("partition") offset = record.get("offset") value_b64 = record.get("value") try: data = base64.b64decode(value_b64).decode("utf-8") process_message(data) except Exception as exc: print(f"Failed to process record topic={topic} partition={partition} offset={offset}: {exc}") item_identifier: Dict[str, Any] = { "topic-partition": f"{topic}-{partition}", "offset": int(offset) if offset is not None else None, } failures.append({"itemIdentifier": item_identifier}) return {"batchItemFailures": failures} def process_message(data: str) -> None: # Your business logic for a single message pass

Hier ist eine Version von Node.js:

const { Buffer } = require("buffer"); const handler = async (event) => { const failures = []; for (let topicPartition in event.records) { const records = event.records[topicPartition]; for (const record of records) { const topic = record.topic; const partition = record.partition; const offset = record.offset; const valueBase64 = record.value; const data = Buffer.from(valueBase64, "base64").toString("utf8"); try { await processMessage(data); } catch (error) { console.error("Failed to process record", { topic, partition, offset, error }); const itemIdentifier = { "topic-partition": `${topic}-${partition}`, offset: Number(offset), }; failures.push({ itemIdentifier }); } } } return { batchItemFailures: failures }; }; async function processMessage(payload) { // Your business logic for a single message } module.exports = { handler };