Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verarbeitung selbstverwalteter Apache Kafka-Nachrichten mit Lambda
Anmerkung
Wenn Sie Daten an ein anderes Ziel als eine Lambda-Funktion senden oder die Daten vor dem Senden anreichern möchten, finden Sie weitere Informationen unter Amazon EventBridge Pipes.
Themen
Hinzufügen eines Kafka-Clusters als Ereignisquelle
Um eine Ereignisquellenzuordnung zu erstellen, fügen Sie Ihren Kafka-Cluster mithilfe der Lambda-Konsole, an oder () als AWS SDKLambda-Funktionstrigger hinzu.AWS Command Line InterfaceAWS CLI
In diesem Abschnitt wird beschrieben, wie Sie mit der Lambda-Konsole und AWS CLI ein Ereignisquellen-Zuweisung erstellen.
Voraussetzungen
-
Selbstverwaltetes Apache-Kafka-Cluster. Lambda unterstützt Apache Kafka Version 0.10.1.0 und höher.
-
Eine Ausführungsrolle mit der Berechtigung, auf die AWS Ressourcen zuzugreifen, die Ihr selbstverwalteter Kafka-Cluster verwendet.
Anpassbare Konsumentengruppen-ID
Wenn Sie Kafka als Ereignisquelle einrichten, können Sie eine Konsumentengruppen-ID angeben. Diese Konsumentengruppen-ID ist eine vorhandene Kennung für die Kafka-Konsumentengruppe, der Ihre Lambda-Funktion beitreten soll. Mit diesem Feature können Sie alle laufenden Kafka-Datensatzverarbeitungs-Setups nahtlos von anderen Verbrauchern auf Lambda migrieren.
Wenn Sie eine Konsumentengruppen-ID angeben und sie innerhalb dieser Konsumentengruppe weitere aktive Poller gibt, verteilt Kafka Nachrichten an alle Konsumenten. Mit anderen Worten, Lambda erhält nicht alle Nachrichten zum Thema Kafka. Wenn Sie möchten, dass Lambda alle Nachrichten im Thema verarbeitet, deaktivieren Sie alle anderen Poller in dieser Konsumentengruppe.
Wenn Sie außerdem eine Konsumentengruppen-ID angeben und Kafka eine gültige vorhandene Konsumentengruppe mit derselben ID findet, ignoriert Lambda die StartingPosition
-Parameter für Ihre Ereignisquellen-Zuweisung. Stattdessen beginnt Lambda mit der Verarbeitung von Datensätzen gemäß dem zugesagten Versatz der Konsumentengruppe. Wenn Sie eine Konsumentengruppen-ID angeben und Kafka keine vorhandene Konsumentengruppe finden kann, konfiguriert Lambda Ihre Ereignisquelle mit der angegebenen StartingPosition
.
Die Konsumentengruppen-ID, die Sie angeben, muss unter all Ihren Kafka-Ereignisquellen eindeutig sein. Nachdem Sie eine Kafka-Ereignisquellenzuordnung mit der angegebenen Konsumentengruppen-ID erstellt haben, können Sie diesen Wert nicht aktualisieren.
Hinzufügen eines selbstverwalteten Kafka-Clusters (Konsole)
Befolgen Sie diese Schritte, um Ihren selbstverwalteten Apache-Kafka-Cluster und ein Kafka-Thema als Auslöser für Ihre Lambda-Funktion hinzuzufügen.
So fügen Sie Ihrer Lambda-Funktion (Konsole) einen Apache-Kafka-Auslöser hinzu
-
Öffnen Sie die Seite Funktionen
der Lambda-Konsole. -
Wählen Sie den Namen Ihrer Lambda-Funktion aus.
-
Wählen Sie unter Function overview (Funktionsübersicht) die Option Add trigger (Trigger hinzufügen).
-
Führen Sie unter Auslöser-Konfiguration die folgenden Schritte aus:
-
Wählen Sie den Apache-Kafka-Auslösertyp.
-
Geben Sie für Bootstrap-Server die Host- und Portpaaradresse eines Kafka-Brokers in Ihrem Cluster ein und wählen Sie dann Hinzufügen. Wiederholen Sie den Vorgang für jeden Kafka-Broker im Cluster.
-
Geben Sie für Themenname den Namen des Kafka-Themas ein, das zum Speichern von Datensätzen im Cluster verwendet wird.
-
(Optional) Geben Sie für Batchgröße die maximale Anzahl von Datensätzen ein, die in einem einzelnen Batch empfangen werden sollen.
-
Geben Sie für Batch window (Batch-Fenster) die maximale Zeit in Sekunden ein, die Lambda mit dem Sammeln von Datensätzen verbringt, bevor die Funktion aufgerufen wird.
-
(Optional) Geben Sie für Konsumentengruppen-ID die ID einer Kafka-Konsumentengruppe ein, der Sie beitreten möchten.
-
(Optional) Wählen Sie für Startposition die Option Neueste, um mit dem Lesen des Streams aus dem letzten Datensatz zu beginnen, Horizont trimmen, um mit dem frühesten verfügbaren Datensatz zu beginnen, oder Am Zeitstempel, um einen Zeitstempel anzugeben, ab dem mit dem Lesen begonnen werden soll.
-
(Optional) Wählen Sie Amazon VPC für VPCIhren Kafka-Cluster aus. Wählen Sie dann die VPCSubnetze und VPCSicherheitsgruppen aus.
Diese Einstellung ist erforderlich, wenn nur Benutzer in Ihrem Umkreis VPC auf Ihre Broker zugreifen.
-
(Optional) Wählen Sie bei Authentication (Authentifizierung) die Option Add (Hinzufügen) aus und gehen Sie dann folgendermaßen vor:
-
Wählen Sie das Zugriffs- oder Authentifizierungsprotokoll der Kafka-Broker in Ihrem Cluster aus.
-
Wenn Ihr Kafka-Broker die PLAIN Authentifizierung SASL /verwendet, wählen Sie BASIC_ AUTH.
-
Wenn Ihr Broker die SCRAM AuthentifizierungSASL/verwendet, wählen Sie eines der SCRAM Protokolle SASL_.
-
Wenn Sie die TLS M-Authentifizierung konfigurieren, wählen Sie das AUTH Protokoll CLIENTCERTIFICATE_ TLS _ _.
-
-
Wählen Sie für die TLS AuthentifizierungSASL/SCRAModer m den geheimen Schlüssel von Secrets Manager, der die Anmeldeinformationen für Ihren Kafka-Cluster enthält.
-
-
(Optional) Wählen Sie für Verschlüsselung das Secrets Manager-Geheimnis aus, das das Root-CA-Zertifikat enthält, das Ihre Kafka-Broker für die TLS Verschlüsselung verwenden, wenn Ihre Kafka-Broker Zertifikate verwenden, die von einer privaten CA signiert wurden.
Diese Einstellung gilt für die TLS Verschlüsselung fürSASL/SCRAModerSASL/PLAINund für die M-Authentifizierung. TLS
-
Um den Auslöser zu Testzwecken in einem deaktivierten Zustand zu erstellen (empfohlen), deaktivieren Sie Auslöser aktivieren. Um den Auslöser sofort zu aktivieren, wählen Sie Auslöser aktivieren.
-
-
Wählen Sie hinzufügen aus, um den Auslöser zu erstellen.
Selbstverwaltetes Apache-Kafka-Cluster hinzufügen (AWS CLI)
Verwenden Sie die folgenden AWS CLI Beispielbefehle, um einen selbstverwalteten Apache Kafka-Trigger für Ihre Lambda-Funktion zu erstellen und anzuzeigen.
Verwenden SASL von/SCRAM
Wenn Kafka-Benutzer über das Internet auf Ihre Kafka-Broker zugreifen, geben Sie das Secrets Manager Manager-Geheimnis an, das Sie für SASL SCRAM /authentication erstellt haben. Im folgenden Beispiel wird der create-event-source-mappingmy-kafka-function
zuzuordnen. AWSKafkaTopic
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333
:secret:MyBrokerSecretName
\ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Mit einem VPC
Wenn nur Kafka-Benutzer in Ihrem Umfeld VPC auf Ihre Kafka-Broker zugreifen, müssen Sie Ihre VPC Subnetze und VPC Ihre Sicherheitsgruppe angeben. Im folgenden Beispiel wird der create-event-source-mappingmy-kafka-function
zuzuordnen. AWSKafkaTopic
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Den Status mit dem anzeigen AWS CLI
Im folgenden Beispiel wird der get-event-source-mapping
aws lambda get-event-source-mapping --uuid
dh38738e-992b-343a-1077-3478934hjkfd7
Selbstverwalteter Apache-Kafka-Konfigurationsparameter
Alle Lambda-Ereignisquellentypen verwenden dieselben CreateEventSourceMappingUpdateEventSourceMappingAPIAND-Operationen. Allerdings gelten nur einige der Parameter für Apache Kafka.
Parameter | Erforderlich | Standard | Hinweise |
---|---|---|---|
BatchSize |
N |
100 |
Höchstwert: 10 000. |
Enabled |
N |
Aktiviert |
Keine |
FunctionName |
Y |
N/A |
Keine |
FilterCriteria |
N |
N/A |
Steuern Sie, welche Ereignisse Lambda an Ihre Funktion sendet |
MaximumBatchingWindowInSeconds |
N |
500 ms |
|
SelfManagedEventSource |
Y |
N/A |
Liste der Kafka Broker. Kann nur auf „Erstellen“ festgelegt werden |
SelfManagedKafkaEventSourceConfig |
N |
Enthält das ConsumerGroupId Feld, das standardmäßig einen eindeutigen Wert hat. |
Kann nur auf „Erstellen“ festgelegt werden |
SourceAccessConfigurations |
N |
Keine Anmeldedaten |
VPCInformationen oder Anmeldeinformationen für den Cluster Legen Sie für SASL _ PLAIN den Wert auf BASIC _ fest AUTH |
StartingPosition |
Y |
N/A |
AT_ TIMESTAMPHORIZON, TRIM _ oder LATEST Kann nur auf „Erstellen“ festgelegt werden |
StartingPositionTimestamp |
N |
N/A |
Erforderlich, wenn auf StartingPosition AT_ gesetzt ist TIMESTAMP |
Themen |
Y |
N/A |
Topic-Name Kann nur auf „Erstellen“ festgelegt werden |
Verwenden eines Kafka-Clusters als Ereignisquelle
Wenn Sie Ihren Apache Kafka- oder MSK Amazon-Cluster als Auslöser für Ihre Lambda-Funktion hinzufügen, wird der Cluster als Ereignisquelle verwendet.
Lambda liest Ereignisdaten aus den Kafka-Themen, die Sie wie Topics
in einer CreateEventSourceMappingAnfrage angeben, basierend auf den StartingPosition
von Ihnen angegebenen Themen. Nach erfolgreicher Verarbeitung wird Ihr Kafka-Thema Ihrem Kafka-Cluster zugeordnet.
Wenn Sie die StartingPosition
als LATEST
angeben, beginnt Lambda mit dem Lesen der neuesten Nachricht in jeder zum Thema gehörenden Partition. Da es nach der Auslöserkonfiguration eine gewisse Verzögerung geben kann, bevor Lambda mit dem Lesen der Nachrichten beginnt, liest Lambda keine Nachrichten, die innerhalb dieses Zeitraums erzeugt wurden.
Lambda verarbeitet Datensätze von einer oder mehreren Kafka-Themenpartitionen, die Sie angeben, und sendet eine JSON Nutzlast an Ihre Funktion. Wenn mehr Datensätze verfügbar sind, setzt Lambda die Verarbeitung von Datensätzen stapelweise fort, basierend auf dem BatchSize
Wert, den Sie in einer CreateEventSourceMappingAnfrage angeben, bis Ihre Funktion das Thema eingeholt hat.
Wenn Ihre Funktion einen Fehler für eine der Nachrichten in einem Batch zurückgibt, wiederholt Lambda den gesamten Nachrichtenbatch, bis die Verarbeitung erfolgreich ist oder die Nachrichten ablaufen. Sie können Datensätze, bei denen alle Wiederholungsversuche fehlschlagen, zur späteren Verarbeitung an ein Ziel senden, wenn ein Fehler aufgetreten ist.
Anmerkung
Während Lambda-Funktionen in der Regel ein maximales Timeout-Limit von 15 Minuten haben, unterstützen Ereignisquellenzuordnungen für Amazon, selbstverwaltetes Apache KafkaMSK, Amazon DocumentDB und Amazon MQ für ActiveMQ und RabbitMQ nur Funktionen mit einem maximalen Timeout-Limit von 14 Minuten. Diese Einschränkung stellt sicher, dass die Ereignisquellenzuordnung Funktionsfehler und Wiederholungsversuche ordnungsgemäß verarbeiten kann.
Startpositionen für Abfragen und Streams
Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.
-
Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.
-
Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.
Dieses Verhalten bedeutet, dass, wenn Sie LATEST
als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Startposition des Streams als TRIM_HORIZON
oder AT_TIMESTAMP
an.
Automatische Skalierung der Kafka-Ereignisquelle
Wenn Sie anfänglich eine Apache-Kafka-Ereignisquelle erstellen, weist Lambda einen Konsumenten zur Verarbeitung aller Partitionen im Kafka-Thema zu. Jeder Verbraucher hat mehrere Prozessoren, die parallel laufen, um erhöhte Workloads zu bewältigen. Lambda skaliert außerdem je nach Workload automatisch die Anzahl der Verbraucher nach oben oder unten. Um die Nachrichtenreihenfolge in jeder Partition beizubehalten, ist die maximale Anzahl von Verbrauchern pro Partition im Thema ein Verbraucher pro Partition.
In Intervallen von einer Minute wertet Lambda die Verbraucher-Offsetverzögerung aller Partitionen im Thema aus. Wenn die Verzögerung zu hoch ist, empfängt die Partition Nachrichten schneller als Lambda sie verarbeiten kann. Bei Bedarf fügt Lambda dem Thema Verbraucher hinzu oder entfernt sie. Der Skalierungsprozess zum Hinzufügen oder Entfernen von Verbrauchern erfolgt innerhalb von drei Minuten nach der Bewertung.
Wenn Ihre Lambda-Zielfunktion überlastet ist, verringert Lambda die Anzahl der Verbraucher. Diese Aktion reduziert die Workload für die Funktion, indem die Anzahl der Nachrichten reduziert wird, die Verbraucher abrufen und an die Funktion senden können.
Um den Durchsatz Ihres Kafka-Themas zu überwachen, können Sie die Apache Kafka-Verbrauchermetriken wie consumer_lag
und consumer_offset
anzeigen. Um zu überprüfen, wie viele Funktionsaufrufe parallel erfolgen, können Sie auch die Parallelitätsmetriken für Ihre Funktion überwachen.
CloudWatch Amazon-Metriken
Lambda gibt die Metrik OffsetLag
aus, während Ihre Funktion Datensätze verarbeitet. Der Wert dieser Metrik ist die Differenz (der Versatz) zwischen dem letzten Datensatz, der ins Kafka-Ereignisquellen-Thema geschrieben wurde, und dem letzten Datensatz, den die Konsumentengruppe Ihrer Funktion verarbeitet hat. Sie können mit OffsetLag
die Latenz zwischen dem Hinzufügen eines Datensatzes und der Verarbeitung des Datensatzes durch Ihre Konsumentengruppe abschätzen.
Ein zunehmender Trend in OffsetLag
kann auf Probleme mit Pollern in der Konsumentengruppe Ihrer Funktion hinweisen. Weitere Informationen finden Sie unter Metriken für Lambda-Funktionen anzeigen.