Verarbeitung von MSK Amazon-Nachrichten mit Lambda - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verarbeitung von MSK Amazon-Nachrichten mit Lambda

Anmerkung

Wenn Sie Daten an ein anderes Ziel als eine Lambda-Funktion senden oder die Daten vor dem Senden anreichern möchten, finden Sie weitere Informationen unter Amazon EventBridge Pipes.

Amazon MSK als Eventquelle hinzufügen

Um eine Ereignisquellenzuordnung zu erstellen, fügen Sie Amazon mithilfe der Lambda-Konsole, an oder AWS Command Line Interface () MSK AWS CLI als AWS SDKLambda-Funktionstrigger hinzu. Beachten Sie, dass Lambda beim Hinzufügen von Amazon MSK als Trigger die VPC Einstellungen des MSK Amazon-Clusters annimmt, nicht die Einstellungen der Lambda-Funktion. VPC

In diesem Abschnitt wird beschrieben, wie Sie mit der Lambda-Konsole und AWS CLI ein Ereignisquellen-Zuweisung erstellen.

Voraussetzungen

  • Ein MSK Amazon-Cluster und ein Kafka-Thema. Weitere Informationen finden Sie unter Erste Schritte mit Amazon MSK im Amazon Managed Streaming for Apache Kafka Developer Guide.

  • Eine Ausführungsrolle mit der Berechtigung, auf die AWS Ressourcen zuzugreifen, die Ihr MSK Cluster verwendet.

Anpassbare Konsumentengruppen-ID

Wenn Sie Kafka als Ereignisquelle einrichten, können Sie eine Konsumentengruppen-ID angeben. Diese Konsumentengruppen-ID ist eine vorhandene Kennung für die Kafka-Konsumentengruppe, der Ihre Lambda-Funktion beitreten soll. Mit diesem Feature können Sie alle laufenden Kafka-Datensatzverarbeitungs-Setups nahtlos von anderen Verbrauchern auf Lambda migrieren.

Wenn Sie eine Konsumentengruppen-ID angeben und sie innerhalb dieser Konsumentengruppe weitere aktive Poller gibt, verteilt Kafka Nachrichten an alle Konsumenten. Mit anderen Worten, Lambda erhält nicht alle Nachrichten zum Thema Kafka. Wenn Sie möchten, dass Lambda alle Nachrichten im Thema verarbeitet, deaktivieren Sie alle anderen Poller in dieser Konsumentengruppe.

Wenn Sie außerdem eine Konsumentengruppen-ID angeben und Kafka eine gültige vorhandene Konsumentengruppe mit derselben ID findet, ignoriert Lambda die StartingPosition-Parameter für Ihre Ereignisquellen-Zuweisung. Stattdessen beginnt Lambda mit der Verarbeitung von Datensätzen gemäß dem zugesagten Versatz der Konsumentengruppe. Wenn Sie eine Konsumentengruppen-ID angeben und Kafka keine vorhandene Konsumentengruppe finden kann, konfiguriert Lambda Ihre Ereignisquelle mit der angegebenen StartingPosition.

Die Konsumentengruppen-ID, die Sie angeben, muss unter all Ihren Kafka-Ereignisquellen eindeutig sein. Nachdem Sie eine Kafka-Ereignisquellenzuordnung mit der angegebenen Konsumentengruppen-ID erstellt haben, können Sie diesen Wert nicht aktualisieren.

Hinzufügen eines MSK Amazon-Triggers (Konsole)

Gehen Sie wie folgt vor, um Ihren MSK Amazon-Cluster und ein Kafka-Thema als Auslöser für Ihre Lambda-Funktion hinzuzufügen.

So fügen Sie Ihrer Lambda-Funktion (Konsole) einen MSK Amazon-Trigger hinzu
  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie den Namen Ihrer Lambda-Funktion aus.

  3. Wählen Sie unter Function overview (Funktionsübersicht) die Option Add trigger (Trigger hinzufügen).

  4. Führen Sie unter Auslöser-Konfiguration die folgenden Schritte aus:

    1. Wählen Sie den MSK-Auslösertyp.

    2. Wählen Sie für MSKCluster Ihren Cluster aus.

    3. Geben Sie für Batchgröße die maximale Anzahl von Nachrichten ein, die in einem einzelnen Batch empfangen werden sollen.

    4. Geben Sie für Batch window (Batch-Fenster) die maximale Zeit in Sekunden ein, die Lambda mit dem Sammeln von Datensätzen verbringt, bevor die Funktion aufgerufen wird.

    5. Geben Sie für Themenname den Namen eines Kafka-Themas ein.

    6. (Optional) Geben Sie für Konsumentengruppen-ID die ID einer Kafka-Konsumentengruppe ein, der Sie beitreten möchten.

    7. (Optional) Wählen Sie für Startposition die Option Neueste, um mit dem Lesen des Streams aus dem letzten Datensatz zu beginnen, Horizont trimmen, um mit dem frühesten verfügbaren Datensatz zu beginnen, oder Am Zeitstempel, um einen Zeitstempel anzugeben, ab dem mit dem Lesen begonnen werden soll.

    8. (Optional) Wählen Sie für die Authentifizierung den geheimen Schlüssel für die Authentifizierung bei den Brokern in Ihrem MSK Cluster aus.

    9. Um den Auslöser zu Testzwecken in einem deaktivierten Zustand zu erstellen (empfohlen), deaktivieren Sie Auslöser aktivieren. Um den Auslöser sofort zu aktivieren, wählen Sie Auslöser aktivieren.

  5. Wählen Sie hinzufügen aus, um den Auslöser zu erstellen.

Hinzufügen eines MSK Amazon-Triggers (AWS CLI)

Verwenden Sie die folgenden AWS CLI Beispielbefehle, um einen MSK Amazon-Trigger für Ihre Lambda-Funktion zu erstellen und anzuzeigen.

Erstellen eines Triggers mit dem AWS CLI

Beispiel — Erstellen Sie eine Ereignisquellenzuordnung für einen Cluster, der IAM Authentifizierung verwendet

Im folgenden Beispiel wird der create-event-source-mapping AWS CLI Befehl verwendet, um eine Lambda-Funktion mit dem Namen einem Kafka-Thema namens my-kafka-function zuzuordnen. AWSKafkaTopic Die Ausgangsposition des Themas ist auf LATEST festgelegt. Wenn der Cluster die IAMrollenbasierte Authentifizierung verwendet, benötigen Sie kein Objekt. SourceAccessConfiguration Beispiel:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Beispiel — Erstellen Sie eine Ereignisquellenzuordnung für einen Cluster, der die /-Authentifizierung verwendet SASL SCRAM

Wenn der Cluster die SCRAMAuthentifizierungSASL/verwendet, müssen Sie ein SourceAccessConfigurationObjekt angeben, das ein Secrets Manager-Geheimnis spezifiziert, SASL_SCRAM_512_AUTH und ein Secrets Manager Manager-Geheimnis angebenARN.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Beispiel — Erstellen Sie eine Ereignisquellenzuordnung für einen Cluster, der die TLS M-Authentifizierung verwendet

Wenn der Cluster die TLSM-Authentifizierung verwendet, müssen Sie ein SourceAccessConfigurationObjekt angeben, das einen Secrets Manager Manager-Schlüssel spezifiziert CLIENT_CERTIFICATE_TLS_AUTHARN.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

Weitere Informationen finden Sie in der CreateEventSourceMappingAPIReferenzdokumentation.

Den Status mit dem anzeigen AWS CLI

Im folgenden Beispiel wird der get-event-source-mapping AWS CLI Befehl verwendet, um den Status der von Ihnen erstellten Ereignisquellenzuordnung zu beschreiben.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

MSKAmazon-Konfigurationsparameter

Alle Lambda-Ereignisquellentypen verwenden dieselben CreateEventSourceMappingUpdateEventSourceMappingAPIAND-Operationen. Allerdings gelten nur einige der Parameter für AmazonMSK.

Parameter Erforderlich Standard Hinweise

AmazonManagedKafkaEventSourceConfig

N

Enthält das ConsumerGroupId Feld, das standardmäßig einen eindeutigen Wert hat.

Kann nur auf „Erstellen“ festgelegt werden

BatchSize

N

100

Höchstwert: 10 000.

Enabled

N

Aktiviert

Keine

EventSourceArn

Y

N/A

Kann nur auf „Erstellen“ festgelegt werden

FunctionName

Y

N/A

Keine

FilterCriteria

N

N/A

Steuern Sie, welche Ereignisse Lambda an Ihre Funktion sendet

MaximumBatchingWindowInSeconds

N

500 ms

Batching-Verhalten

SourceAccessConfigurations

N

Keine Anmeldedaten

SASL/SCRAModer CLIENT _ _ CERTIFICATE TLS _ AUTH (GegenseitigeTLS) Authentifizierungsdaten für Ihre Ereignisquelle

StartingPosition

Y

N/A

AT_TIMESTAMP, TRIM _HORIZON, oder LATEST

Kann nur auf „Erstellen“ festgelegt werden

StartingPositionTimestamp

N

N/A

Erforderlich, wenn StartingPosition es auf AT_ gesetzt ist TIMESTAMP

Themen

Y

N/A

Kafka-Thema-Name

Kann nur auf „Erstellen“ festgelegt werden

Erstellen von kontenübergreifenden Zuordnungen von Ereignisquellen

Sie können VPCMulti-Private-Konnektivität verwenden, um eine Lambda-Funktion mit einem bereitgestellten MSK Cluster in einem anderen zu verbinden. AWS-Konto Verwendung mehrerer VPC Konnektivität AWS PrivateLink, wodurch der gesamte Datenverkehr im AWS Netzwerk bleibt.

Anmerkung

Sie können keine kontenübergreifenden Zuordnungen von Ereignisquellen für serverlose Cluster erstellen. MSK

Um eine kontenübergreifende Zuordnung von Ereignisquellen zu erstellen, müssen Sie zunächst die Multikonnektivität für den Cluster konfigurieren. VPC MSK Wenn Sie die Ereignisquellenzuordnung erstellen, verwenden Sie die verwaltete VPC Verbindung ARN anstelle des ClustersARN, wie in den folgenden Beispielen gezeigt. Der CreateEventSourceMappingVorgang unterscheidet sich auch je nachdem, welchen Authentifizierungstyp der MSK Cluster verwendet.

Beispiel — Erstellen Sie eine kontenübergreifende Ereignisquellenzuordnung für einen Cluster, der Authentifizierung verwendet IAM

Wenn der Cluster die IAMrollenbasierte Authentifizierung verwendet, benötigen Sie kein Objekt. SourceAccessConfiguration Beispiel:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Beispiel — Erstellen Sie eine kontenübergreifende Zuordnung von Ereignisquellen für einen Cluster, der die /-Authentifizierung verwendet SASL SCRAM

Wenn der Cluster die SCRAMAuthentifizierungSASL/verwendet, müssen Sie ein SourceAccessConfigurationObjekt angeben, das ein Secrets Manager-Geheimnis spezifiziert, SASL_SCRAM_512_AUTH und ein Secrets Manager Manager-Geheimnis angebenARN.

Es gibt zwei Möglichkeiten, Geheimnisse für kontoübergreifende Zuordnungen von MSK Amazon-Ereignisquellen mit SASL der /-Authentifizierung zu verwenden: SCRAM

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Beispiel — Erstellen Sie eine kontoübergreifende Zuordnung von Ereignisquellen für einen Cluster, der die TLS M-Authentifizierung verwendet

Wenn der Cluster die TLSM-Authentifizierung verwendet, müssen Sie ein SourceAccessConfigurationObjekt angeben, das einen Secrets Manager Manager-Schlüssel spezifiziert CLIENT_CERTIFICATE_TLS_AUTHARN. Das Secret kann im Clusterkonto oder im Lambda-Funktionskonto gespeichert werden.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Verwenden eines MSK Amazon-Clusters als Ereignisquelle

Wenn Sie Ihren Apache Kafka- oder MSK Amazon-Cluster als Auslöser für Ihre Lambda-Funktion hinzufügen, wird der Cluster als Ereignisquelle verwendet.

Lambda liest Ereignisdaten aus den Kafka-Themen, die Sie wie Topics in einer CreateEventSourceMappingAnfrage angeben, basierend auf den StartingPosition von Ihnen angegebenen Themen. Nach erfolgreicher Verarbeitung wird Ihr Kafka-Thema Ihrem Kafka-Cluster zugeordnet.

Wenn Sie die StartingPosition als LATEST angeben, beginnt Lambda mit dem Lesen der neuesten Nachricht in jeder zum Thema gehörenden Partition. Da es nach der Auslöserkonfiguration eine gewisse Verzögerung geben kann, bevor Lambda mit dem Lesen der Nachrichten beginnt, liest Lambda keine Nachrichten, die innerhalb dieses Zeitraums erzeugt wurden.

Lambda liest Nachrichten sequentiell für jede Kafka-Themenpartition. Eine einzelne Lambda-Nutzlast kann Nachrichten von mehreren Partitionen enthalten. Wenn mehr Datensätze verfügbar sind, setzt Lambda die Verarbeitung von Datensätzen stapelweise fort, basierend auf dem BatchSize Wert, den Sie in einer CreateEventSourceMappingAnfrage angeben, bis Ihre Funktion das Thema eingeholt hat.

Nachdem Lambda jeden Batch verarbeitet hat, werden die Offsets der Nachrichten in diesem Batch festgeschrieben. Wenn Ihre Funktion einen Fehler für eine der Nachrichten in einem Batch zurückgibt, wiederholt Lambda den gesamten Nachrichtenbatch, bis die Verarbeitung erfolgreich ist oder die Nachrichten ablaufen. Sie können Datensätze, bei denen alle Wiederholungsversuche fehlschlagen, zur späteren Verarbeitung an ein Ziel senden, wenn ein Fehler aufgetreten ist.

Anmerkung

Während Lambda-Funktionen in der Regel ein maximales Timeout-Limit von 15 Minuten haben, unterstützen Ereignisquellenzuordnungen für Amazon, selbstverwaltetes Apache KafkaMSK, Amazon DocumentDB und Amazon MQ für ActiveMQ und RabbitMQ nur Funktionen mit einem maximalen Timeout-Limit von 14 Minuten. Diese Einschränkung stellt sicher, dass die Ereignisquellenzuordnung Funktionsfehler und Wiederholungsversuche ordnungsgemäß verarbeiten kann.

Startpositionen für Abfragen und Streams

Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.

  • Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.

  • Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.

Dieses Verhalten bedeutet, dass, wenn Sie LATEST als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Startposition des Streams als TRIM_HORIZON oder AT_TIMESTAMP an.

CloudWatch Amazon-Metriken

Lambda gibt die Metrik OffsetLag aus, während Ihre Funktion Datensätze verarbeitet. Der Wert dieser Metrik ist die Differenz (der Versatz) zwischen dem letzten Datensatz, der ins Kafka-Ereignisquellen-Thema geschrieben wurde, und dem letzten Datensatz, den die Konsumentengruppe Ihrer Funktion verarbeitet hat. Sie können mit OffsetLag die Latenz zwischen dem Hinzufügen eines Datensatzes und der Verarbeitung des Datensatzes durch Ihre Konsumentengruppe abschätzen.

Ein zunehmender Trend in OffsetLag kann auf Probleme mit Pollern in der Konsumentengruppe Ihrer Funktion hinweisen. Weitere Informationen finden Sie unter Metriken für Lambda-Funktionen anzeigen.

Automatische Skalierung der MSK Amazon-Eventquelle

Wenn Sie zum ersten Mal eine MSK Amazon-Ereignisquelle erstellen, weist Lambda einen Verbraucher für die Verarbeitung aller Partitionen im Kafka-Thema zu. Jeder Verbraucher hat mehrere Prozessoren, die parallel laufen, um erhöhte Workloads zu bewältigen. Lambda skaliert außerdem je nach Workload automatisch die Anzahl der Verbraucher nach oben oder unten. Um die Nachrichtenreihenfolge in jeder Partition beizubehalten, ist die maximale Anzahl von Verbrauchern pro Partition im Thema ein Verbraucher pro Partition.

In Intervallen von einer Minute wertet Lambda die Verbraucher-Offsetverzögerung aller Partitionen im Thema aus. Wenn die Verzögerung zu hoch ist, empfängt die Partition Nachrichten schneller als Lambda sie verarbeiten kann. Bei Bedarf fügt Lambda dem Thema Verbraucher hinzu oder entfernt sie. Der Skalierungsprozess zum Hinzufügen oder Entfernen von Verbrauchern erfolgt innerhalb von drei Minuten nach der Bewertung.

Wenn Ihre Lambda-Zielfunktion gedrosselt ist, verringert Lambda die Anzahl der Verbraucher. Diese Aktion reduziert die Workload für die Funktion, indem die Anzahl der Nachrichten reduziert wird, die Verbraucher abrufen und an die Funktion senden können.

Um den Durchsatz Ihres Kafka-Themas zu überwachen, sehen Sie sich die Offset-Verzögerungsmetrik an, die Lambda emittiert, während Ihre Funktion Datensätze verarbeitet.

Um zu überprüfen, wie viele Funktionsaufrufe parallel erfolgen, können Sie auch die Parallelitätsmetriken für Ihre Funktion überwachen.