Verwenden von Lambda mit Amazon MSK - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden von Lambda mit Amazon MSK

Anmerkung

Wenn Sie Daten an ein anderes Ziel als eine Lambda-Funktion senden oder die Daten vor dem Senden anreichern möchten, finden Sie weitere Informationen unter Amazon EventBridge Pipes.

Amazon Managed Streaming for Apache Kafka (Amazon MSK) ist ein vollständig verwalteter Service, mit dem Sie Anwendungen erstellen und ausführen können, die Apache Kafka zum Verarbeiten von Streaming-Daten verwenden. Amazon MSK vereinfacht die Einrichtung, Skalierung und Verwaltung von Clustern, auf denen Kafka ausgeführt wird. Amazon MSK erleichtert auch die Konfiguration Ihrer Anwendung für mehrere Availability Zones und für die Sicherheit mit AWS Identity and Access Management (IAM). Amazon MSK unterstützt mehrere Open-Source-Versionen von Kafka.

Amazon MSK als Ereignisquelle funktioniert ähnlich wie die Verwendung von Amazon Simple Queue Service (Amazon SQS) oder Amazon Kinesis. Lambda fragt intern neue Nachrichten von der Ereignisquelle ab und ruft dann synchron die Ziel-Lambda-Funktion auf. Lambda liest die Nachrichten in Batches und stellt diese Ihrer Funktion als Ereignisnutzlast zur Verfügung. Die maximale Batchgröße ist konfigurierbar (Standardeinstellung: 100 Nachrichten). Weitere Informationen finden Sie unter Batching-Verhalten.

Anmerkung

Während Lambda-Funktionen in der Regel ein maximales Timeout-Limit von 15 Minuten haben, unterstützen Ereignisquellenzuordnungen für Amazon MSK, selbstverwaltetes Apache Kafka, Amazon DocumentDB, Amazon MQ für ActiveMQ und RabbitMQ nur Funktionen mit einem maximalen Timeout-Limit von 14 Minuten. Diese Einschränkung stellt sicher, dass die Ereignisquellenzuordnung Funktionsfehler und Wiederholungsversuche ordnungsgemäß verarbeiten kann.

Lambda liest die Nachrichten nacheinander für jede Partition. Eine einzelne Lambda-Nutzlast kann Nachrichten von mehreren Partitionen enthalten. Nachdem Lambda jeden Batch verarbeitet hat, werden die Offsets der Nachrichten in diesem Batch festgeschrieben. Wenn Ihre Funktion einen Fehler für eine der Nachrichten in einem Batch zurückgibt, wiederholt Lambda den gesamten Nachrichtenbatch, bis die Verarbeitung erfolgreich ist oder die Nachrichten ablaufen.

Warnung

Lambda-Ereignisquellenzuordnungen verarbeiten jedes Ereignis mindestens einmal, und eine doppelte Verarbeitung von Batches kann auftreten. Um potenzielle Probleme im Zusammenhang mit doppelten Ereignissen zu vermeiden, empfehlen wir dringend, Ihren Funktionscode idempotent zu machen. Weitere Informationen finden Sie unter Wie mache ich meine Lambda-Funktion idempotent? im - AWS Wissenscenter.

Ein Beispiel für die Konfiguration von Amazon MSK als Ereignisquelle finden Sie unter Verwenden von Amazon MSK als Ereignisquelle für AWS Lambda im AWS Compute Blog. Ein vollständiges Tutorial finden Sie unter Amazon-MSK-Lambda-Integration in den Amazon-MSK-Übungen.

Beispielereignis

Lambda sendet den Batch von Nachrichten im Ereignisparameter, wenn es Ihre Funktion aufruft. Die Ereignisnutzlast enthält ein Array von Meldungen. Jedes Array-Element enthält Details zum Amazon-MSK-Thema und zur Partitions-ID sowie einen Zeitstempel und eine base64-codierte Nachricht.

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

MSK-Cluster-Authentifizierung

Lambda benötigt eine Berechtigung, um auf den Amazon-MSK-Cluster zuzugreifen, Datensätze abzurufen und andere Aufgaben auszuführen. Amazon MSK unterstützt mehrere Optionen zur Steuerung des Client-Zugriffs auf den MSK-Cluster.

Nicht authentifizierter Zugriff

Wenn keine Clients über das Internet auf den Cluster zugreifen, können Sie einen nicht authentifizierten Zugriff verwenden.

SASL/SCRAM-Authentifizierung

Amazon MSK unterstützt die Authentifizierung mit Transport Layer Security (TLS)-Verschlüsselung von Simple Authentication and Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM). Damit Lambda eine Verbindung mit dem Cluster herstellen kann, speichern Sie die Authentifizierungsanmeldeinformationen (Benutzername und Passwort) in einem - AWS Secrets Manager Secret.

Weitere Informationen zur Verwendung von Secrets Manager finden Sie unter Benutzername und Passwortauthentifizierung mit AWS Secrets Manager im Entwicklerhandbuch für Amazon Managed Streaming for Apache Kafka.

Amazon MSK unterstützt die SASL/PLAIN-Authentifizierung nicht.

Auf IAM-Rolle basierende Authentifizierung

Sie können IAM verwenden, um die Identität von Clients zu authentifizieren, die sich mit dem MSK-Cluster verbinden. Wenn die IAM-Authentifizierung auf Ihrem MSK-Cluster aktiv ist und Sie kein Geheimnis für die Authentifizierung angeben, verwendet Lambda automatisch standardmäßig die IAM-Authentifizierung. Verwenden Sie die IAM-Konsole oder API, um Richtlinien zu erstellen und zu implementieren, die auf Benutzern oder Rollen basieren. Weitere Informationen finden Sie unter IAM-Zugriffskontrolle im Entwicklerhandbuch für Amazon Managed Streaming for Apache Kafka.

Fügen Sie die folgenden Berechtigungen zur Ausführungsrolle Ihrer Funktion hinzu, um Lambda zu erlauben, sich mit dem MSK-Cluster zu verbinden, Datensätze zu lesen und andere erforderliche Aktionen auszuführen.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

Sie können diese Berechtigungen für einen bestimmten Cluster, ein bestimmtes Thema und eine bestimmte Gruppe einteilen. Weitere Informationen finden Sie unter Amazon-MSK-Kafka-Aktionen im Entwicklerhandbuch für Amazon Managed Streaming for Apache Kafka.

Gegenseitige TLS-Authentifizierung

Gegenseitige TLS (mTLS) bietet eine bidirektionale Authentifizierung zwischen Client und Server. Der Client sendet ein Zertifikat an den Server, damit der Server den Client überprüfen kann, und der Server sendet ein Zertifikat an den Client, damit der Client den Server überprüfen kann.

Für Amazon MSK fungiert Lambda als der Client. Sie konfigurieren ein Client-Zertifikat (als Secret in Secrets Manager), um Lambda für die Broker in Ihrem MSK-Cluster zu authentifizieren. Das Client-Zertifikat muss von einer Zertifizierungsstelle im Trust Store des Servers signiert sein. Der MSK-Cluster sendet ein Serverzertifikat an Lambda, um die Broker für Lambda zu authentifizieren. Das Serverzertifikat muss von einer Zertifizierungsstelle (CA) signiert sein, die sich im AWS Trust Store befindet.

Informationen dazu, wie Sie ein Client-Zertifikat generieren, finden Sie unter Vorstellung von gegenseitiger TLS-Authentifizierung für Amazon MSK als Ereignisquelle.

Amazon MSK unterstützt keine selbstsignierten Serverzertifikate, da alle Broker in Amazon MSK öffentliche Zertifikate verwenden, die von Amazon-TrustServices-Zertifizierungsstellen signiert sind, denen Lambda standardmäßig vertraut.

Weitere Informationen über mTLS für Amazon MSK finden Sie unter Gegenseitige TLS-Authentifizierung im Entwicklerhandbuch für Amazon Managed Streaming for Apache Kafka.

Konfigurieren des mTLS-Secrets

Das Secret CLIENT_CERTIFICATE_TLS_AUTH erfordert ein Zertifikatfeld und ein Feld für einen privaten Schlüssel. Für einen verschlüsselten privaten Schlüssel erfordert das Secret ein Passwort für den privaten Schlüssel. Sowohl das Zertifikat als auch der private Schlüssel müssen im PEM-Format vorliegen.

Anmerkung

Lambda unterstützt die PBES1-Algorithmen (aber nicht PBES2) zur Verschlüsselung von privaten Schlüsseln.

Das Zertifikatfeld muss eine Liste von Zertifikaten enthalten, beginnend mit dem Client-Zertifikat, gefolgt von etwaigen Zwischenzertifikaten und endend mit dem Root-Zertifikat. Jedes Zertifikat muss in einer neuen Zeile mit der folgenden Struktur beginnen:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager unterstützt Secrets von bis zu 65 536 Bytes, was genügend Platz für lange Zertifikatsketten bietet.

Der private Schlüssel muss im Format PKCS #8 mit folgender Struktur vorliegen:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Verwenden Sie für einen verschlüsselten privaten Schlüssel die folgende Struktur:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

Im folgenden Beispiel sehen Sie den Inhalt eines Secrets für mTLS-Authentifizierung mit einem verschlüsselten privaten Schlüssel. Fügen Sie für einen verschlüsselten privaten Schlüssel das Passwort für den privaten Schlüssel in das Secret ein.

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

So wählt Lambda einen Bootstrap-Broker

Lambda wählt einen Bootstrap-Broker basierend auf den auf Ihrem Cluster verfügbaren Authentifizierungsmethoden aus und ob Sie ein Geheimnis für die Authentifizierung angeben.. Wenn Sie ein Geheimnis für MTLs oder SASL/SCRAM angeben, wählt Lambda automatisch diese Authentifizierungsmethode. Wenn Sie kein Geheimnis angeben, wählt Lambda die stärkste Authentifizierungsmethode aus, die in Ihrem Cluster aktiv ist. Im Folgenden ist die Reihenfolge der Priorität aufgeführt, in der Lambda einen Broker auswählt, von der stärksten zur schwächsten Authentifizierung:

  • mTLS (Geheimnis für mTLS bereitgestellt)

  • SASL/SCRAM (Geheimnis für SASL/SCRAM bereitgestellt)

  • SASL IAM (kein Geheimnis angegeben und IAM-Authentifizierung aktiv)

  • Nicht authentifiziertes TLS (kein Geheimnis bereitgestellt und IAM-Authentifizierung nicht aktiv)

  • Klartext (kein Geheimnis angegeben und sowohl IAM-Authentifizierung als auch nicht authentifiziertes TLS sind nicht aktiv)

Anmerkung

Wenn Lambda keine Verbindung zum sichersten Brokertyp herstellen kann, versucht Lambda nicht, eine Verbindung zu einem anderen (schwächeren) Brokertyp herzustellen. Wenn Sie möchten, dass Lambda einen schwächeren Brokertyp wählt, deaktivieren Sie alle stärkeren Authentifizierungsmethoden in Ihrem Cluster.

Verwalten von API-Zugriff und -Berechtigungen

Neben dem Zugriff auf den Amazon-MSK-Cluster benötigt Ihre Funktion auch Berechtigungen, um verschiedene Amazon-MSK-API-Aktionen auszuführen. Sie fügen diese Berechtigungen zur Ausführungsrolle der Funktion hinzu. Wenn Ihre Benutzer Zugriff auf Amazon-MSK-API-Aktionen benötigen, fügen Sie die erforderlichen Berechtigungen zur Identitätsrichtlinie für den Benutzer oder die Rolle hinzu.

Sie können Ihrer Ausführungsrolle jede der folgenden Berechtigungen manuell hinzufügen. Alternativ können Sie die von AWS verwaltete Richtlinie AWSLambdaMSKExecutionRole Ihrer Ausführungsrolle zuordnen. Die AWSLambdaMSKExecutionRole-Richtlinie enthält alle unten aufgeführten erforderlichen API-Aktionen und VPC-Berechtigungen.

Erforderliche Berechtigungen für Ausführungsrolle der Lambda-Funktion

Um Protokolle in einer Protokollgruppe in Amazon CloudWatch Logs zu erstellen und zu speichern, muss Ihre Lambda-Funktion über die folgenden Berechtigungen in ihrer Ausführungsrolle verfügen:

Damit Lambda in Ihrem Namen auf Ihren Amazon-MSK-Cluster zugreifen kann, muss Ihre Lambda-Funktion über die folgenden Berechtigungen in ihrer Ausführungsrolle verfügen:

Sie müssen nur entweder kafka:DescribeCluster oder kafka:DescribeClusterV2 hinzufügen. Für bereitgestellte MSK-Cluster funktioniert jede der beiden Berechtigungen. Für Serverless-MSK-Cluster müssen Sie kafka:DescribeClusterV2 verwenden.

Anmerkung

Lambda plant, schließlich die kafka:DescribeCluster-Genehmigung aus dieser AWSLambdaMSKExecutionRole-Richtlinie zu entfernen. Wenn Sie diese Richtlinie verwenden, sollten Sie alle Anwendungen, die kafka:DescribeCluster verwendet, auf kafka:DescribeClusterV2 umstellen.

VPC-Berechtigungen

Wenn nur Benutzer innerhalb einer VPC auf Ihren Amazon-MSK-Cluster zugreifen können, muss Ihre Lambda-Funktion die Berechtigung zum Zugriff auf Ihre Amazon-VPC-Ressourcen haben. Zu diesen Ressourcen gehören Ihre VPC, Subnetze, Sicherheitsgruppen und Netzwerkschnittstellen. Um auf diese Ressourcen zuzugreifen, muss die Ausführungsrolle Ihrer Funktion über die folgenden Berechtigungen verfügen. Diese Berechtigungen sind in der von AWSLambdaMSKExecutionRole AWS verwalteten Richtlinie enthalten.

Optionale Lambda-Funktionsberechtigungen

Ihre Lambda-Funktion benötigt möglicherweise auch Berechtigungen für Folgendes:

  • Greifen Sie auf Ihr SCRAM-Secret zu, wenn Sie die SASL/SCRAM-Authentifizierung verwenden.

  • Beschreiben Ihres Secrets-Manager-Secrets

  • Greifen Sie auf Ihren AWS Key Management Service (AWS KMS) vom Kunden verwalteten Schlüssel zu.

  • Senden von Datensätzen zu fehlgeschlagenen Aufrufen an ein Ziel

Secrets Manager und AWS KMS Berechtigungen

Abhängig von der Art der Zugriffskontrolle, die Sie für Ihre Amazon-MSK-Broker konfigurieren, benötigt Ihre Lambda-Funktion möglicherweise die Berechtigung, auf Ihr SCRAM-Secret (bei Verwendung der SASL/SCRAM-Authentifizierung) oder Secrets-Manager-Secret zuzugreifen, um Ihren vom AWS KMS Kunden verwalteten Schlüssel zu entschlüsseln. Um auf diese Ressourcen zuzugreifen, muss die Ausführungsrolle Ihrer Funktion die folgenden Berechtigungen besitzen:

Senden von Datensätzen an ein Ziel

Wenn Sie Datensätze zu fehlgeschlagenen Aufrufen an ein Ausfallziel senden möchten, muss Ihre Lambda-Funktion über die Berechtigung verfügen, diese Datensätze zu senden. Für Kafka-Ereignisquellenzuordnungen können Sie zwischen einem Amazon-SNS-Thema, einer Amazon-SQS-Warteschlange oder einem Amazon-S3-Bucket als Ziel wählen. Um Datensätze an ein SNS-Thema zu senden, muss die Ausführungsrolle Ihrer Funktion die folgende Berechtigung besitzen:

Um Datensätze an eine SQS-Warteschlange zu senden, muss die Ausführungsrolle Ihrer Funktion die folgenden Berechtigungen besitzen.

Um Datensätze an einen S3-Bucket zu senden, muss die Ausführungsrolle Ihrer Funktion die folgenden Berechtigungen besitzen.

Wenn Sie einen KMS-Schlüssel für Ihr Ziel konfiguriert haben, benötigt Lambda außerdem je nach Zieltyp folgende Berechtigungen:

  • Wenn Sie die Verschlüsselung mit Ihrem eigenen KMS-Schlüssel für ein S3-Ziel aktiviert haben, ist kms:GenerateDataKey erforderlich. Wenn sich der KMS-Schlüssel und das S3-Bucket-Ziel in einem anderen Konto befinden als Ihre Lambda-Funktion und Ausführungsrolle, konfigurieren Sie den KMS-Schlüssel so, dass er der Ausführungsrolle vertraut, um kms zuzulassen:GenerateDataKey.

  • Wenn Sie die Verschlüsselung mit Ihrem eigenen KMS-Schlüssel für das SQS-Ziel aktiviert haben, sind kms:Decrypt und kms:GenerateDataKey erforderlich. Wenn sich der KMS-Schlüssel und das SQS-Warteschlangenziel in einem anderen Konto als Ihre Lambda-Funktion und Ausführungsrolle befinden, konfigurieren Sie den KMS-Schlüssel so, dass er der Ausführungsrolle vertraut, um kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey und kms:ReEncrypt zuzulassen.

  • Wenn Sie die Verschlüsselung mit Ihrem eigenen KMS-Schlüssel für das SNS-Ziel aktiviert haben, sind kms:Decrypt und kms:GenerateDataKey erforderlich. Wenn sich der KMS-Schlüssel und das SNS-Themenziel in einem anderen Konto als Ihre Lambda-Funktion und Ausführungsrolle befinden, konfigurieren Sie den KMS-Schlüssel so, dass er der Ausführungsrolle vertraut, um kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey und kms:ReEncrypt zuzulassen.

Hinzufügen von Berechtigungen zu Ihrer Ausführungsrolle

Gehen Sie wie folgt vor, um die AWS verwaltete Richtlinie mithilfe der IAM-Konsole AWSLambdaMSKExecutionRole zu Ihrer Ausführungsrolle hinzuzufügen.

So fügen Sie eine von AWS verwaltete Richtlinie hinzu
  1. Öffnen Sie die Seite Richtlinien in der IAM-Konsole.

  2. Geben Sie im Suchfeld den Namen der Richtlinie ein (AWSLambdaMSKExecutionRole).

  3. Wählen Sie die Richtlinie aus der Liste aus und wählen Sie dann Richtlinienaktionen, Anhängen aus.

  4. Wählen Sie auf der Seite Richtlinie anfügen Ihre Ausführungsrolle aus der Liste aus, und wählen Sie dann Richtlinie anfügen aus.

Gewähren von Benutzerzugriff mit einer IAM-Richtlinie

Standardmäßig haben Benutzer und Rollen keine Berechtigung zum Ausführen von Amazon-MSK-API-Operationen. Um Benutzern in Ihrem Unternehmen oder Konto Zugriff zu gewähren, können Sie eine identitätsbasierte Richtlinie hinzufügen oder aktualisieren. Weitere Informationen finden Sie unter Beispiele für identitätsbasierte Amazon-MSK-Richtlinien im Entwicklerhanbuch für Amazon Managed Streaming for Apache Kafka.

Authentifizierungs- und Autorisierungsfehler

Wenn eine der Berechtigungen, die zum Verbrauchen von Daten aus dem Amazon-MSK-Cluster erforderlich sind, fehlt, zeigt Lambda eine der folgenden Fehlermeldungen in der Ereignisquellenzuordnung unter anLastProcessingResult.

Cluster konnte Lambda nicht autorisieren

Bei SASL/SCRAM oder mTLS weist dieser Fehler darauf hin, dass der angegebene Benutzer nicht über alle nachfolgenden erforderlichen Berechtigungen für die Kafka-Zugriffskontrollliste (ACL) verfügt:

  • DescribeConfigs Cluster

  • Beschreiben von Gruppe

  • Gruppe lesen

  • Thema beschreiben

  • Thema lesen

Zur IAM-Zugriffskontrolle fehlen der Ausführungsrolle Ihrer Funktion eine oder mehrere der Berechtigungen, die für den Zugriff auf die Gruppe oder das Thema erforderlich sind. Prüfen Sie die Liste der erforderlichen Berechtigungen in Auf IAM-Rolle basierende Authentifizierung.

Wenn Sie Kafka-ACLs oder eine IAM-Richtlinie mit den erforderlichen Kafka-Cluster-Berechtigungen erstellen, geben Sie das Thema und die Gruppe als Ressourcen an. Der Themenname muss mit dem Thema in der Ereignisquellenzuordnung übereinstimmen. Der Gruppenname muss mit der UUID der Ereignisquellenzuordnung übereinstimmen.

Nachdem Sie der Ausführungsrolle die erforderlichen Berechtigungen hinzugefügt haben, kann es einige Minuten dauern, bis die Änderungen wirksam werden.

SASL-Authentifizierung fehlgeschlagen

Bei SASL/SCRAM weist dieser Fehler darauf hin, dass der angegebene Benutzername und das Passwort ungültig sind.

Zur IAM-Zugriffskontrolle fehlt der Ausführungsrolle die Berechtigung kafka-cluster:Connect für den MSK-Cluster. Fügen Sie der Rolle diese Berechtigung hinzu und geben Sie den Amazon-Ressourcenname (ARN) des Clusters als Ressource an.

Dieser Fehler wird Ihnen möglicherweise in zeitlichen Abständen angezeigt. Der Cluster lehnt Verbindungen ab, wenn die Anzahl der TCP-Verbindungen das Amazon-MSK-Servicekontingent überschreitet. Lambda zieht sich zurück und versucht es erneut, bis eine Verbindung erfolgreich ist. Nachdem Lambda eine Verbindung zum Cluster hergestellt hat und nach Datensätzen abfragt, ändert sich das letzte Verarbeitungsergebnis zu OK.

Server konnte Lambda nicht authentifizieren

Dieser Fehler weist darauf hin, dass die Amazon-MSK-Kafka-Broker sich bei Lambda nicht authentifizieren konnten. Dieser Fehler kann aus folgenden Gründen auftreten:

  • Sie haben kein Client-Zertifikat für die mTLS-Authentifizierung bereitgestellt.

  • Sie haben ein Client-Zertifikat bereitgestellt, aber die Broker sind nicht für die Verwendung von mTLS konfiguriert.

  • Die Broker vertrauen einem Client-Zertifikat nicht.

Bereitgestelltes Zertifikat oder bereitgestellter privater Schlüssel ist ungültig

Dieser Fehler weist darauf hin, dass der Amazon-MSK-Konsument das bereitgestellte Zertifikat oder den bereitgestellten privaten Schlüssel nicht verwenden konnte. Stellen Sie sicher, dass das Zertifikat und der Schlüssel das PEM-Format haben und die Verschlüsselung des privaten Schlüssels einen PBES1-Algorithmus nutzt.

Netzwerkkonfiguration

Wenn Sie einen Amazon-VPC-Zugriff für Ihre Kafka-Broker konfigurieren, benötigt Lambda Zugriff auf die Amazon-VPC-Ressourcen, in denen sich Ihr Kafka-Cluster befindet. Wir empfehlen Ihnen, AWS PrivateLink VPC-Endpunkte für Lambda und AWS Security Token Service () bereitzustellen AWS STS. Wenn der Broker Authentifizierung verwendet, stellen Sie auch einen VPC-Endpunkt für Secrets Manager bereit. Wenn Sie ein Ausfallziel konfiguriert haben, müssen Sie auch einen VPC-Endpunkt für den Ziel-Service bereitstellen.

Alternativ können Sie sicherstellen, dass die VPC, die Ihrem Kafka-Cluster zugeordnet ist, ein NAT-Gateway pro öffentlichem Subnetz enthält. Weitere Informationen finden Sie unter Internet- und Servicezugriff für VPC-verbundene Funktionen.

Wenn Sie VPC-Endpunkte verwenden, müssen Sie sie auch so konfigurieren, dass private DNS-Namen aktiviert werden.

Konfigurieren Sie Ihre Amazon-VPC-Sicherheitsgruppen (mindestens) mit den folgenden Regeln:

  • Regeln für eingehenden Datenverkehr – Lassen Sie den gesamten Datenverkehr zum Amazon-MSK-Broker-Port (9092 für Klartext, 9094 für TLS, 9096 für SASL, 9098 für IAM) für die Sicherheitsgruppen zu, die für Ihre Ereignisquelle angegeben sind.

  • Ausgehende Regeln - Erlauben Sie allen Datenverkehr auf Port 443 für alle Ziele. Lassen Sie den gesamten Datenverkehr für den Amazon-MSK-Broker-Port (9092 für Klartext, 9094 für TLS, 9096 für SASL, 9098 für IAM) für die Sicherheitsgruppen zu, die für Ihre Ereignisquelle angegeben sind.

  • Wenn Sie VPC-Endpunkte anstelle eines NAT-Gateways verwenden, müssen die Sicherheitsgruppen, die mit den VPC-Endpunkten verknüpft sind, den gesamten eingehenden Datenverkehr für Port 443 von den Sicherheitsgruppen der Ereignisquelle zulassen.

Anmerkung

Ihre Amazon-VPC-Konfiguration ist über die Amazon MSK API erkennbar. Sie müssen sie während der Einrichtung nicht mit dem Befehl create-event-source-mapping konfigurieren.

Weitere Informationen zum Konfigurieren des Netzwerks finden Sie unter Einrichten von AWS Lambda mit einem Apache-Kafka-Cluster innerhalb einer VPC im AWS -Compute-Blog.

Hinzufügen von Amazon MSK als Ereignisquelle

Um ein Ereignisquellen-Mapping zu erstellen, fügen Sie Amazon MSK als Lambda-Funktionsauslöser mithilfe der Lambda-Konsole, eines AWS -SDKs oder AWS Command Line Interface (AWS CLI) hinzu. Beachten Sie, dass Lambda beim Hinzufügen von Amazon MSK als Auslöser die VPC-Einstellungen des Amazon-MSK-Clusters annimmt, nicht die VPC-Einstellungen der Lambda-Funktion.

In diesem Abschnitt wird beschrieben, wie Sie mit der Lambda-Konsole und AWS CLI ein Ereignisquellen-Zuweisung erstellen.

Voraussetzungen

  • Ein Amazon-MSK-Cluster und ein Kafka-Thema. Weitere Informationen finden Sie unter Erste Schritte mit Amazon MSK im Entwicklerhandbuch für Amazon Managed Streaming for Apache Kafka.

  • Eine Ausführungsrolle mit der Berechtigung zum Zugriff auf die AWS Ressourcen, die Ihr MSK-Cluster verwendet.

Anpassbare Konsumentengruppen-ID

Wenn Sie Kafka als Ereignisquelle einrichten, können Sie eine Konsumentengruppen-ID angeben. Diese Konsumentengruppen-ID ist eine vorhandene Kennung für die Kafka-Konsumentengruppe, der Ihre Lambda-Funktion beitreten soll. Mit diesem Feature können Sie alle laufenden Kafka-Datensatzverarbeitungs-Setups nahtlos von anderen Verbrauchern auf Lambda migrieren.

Wenn Sie eine Konsumentengruppen-ID angeben und sie innerhalb dieser Konsumentengruppe weitere aktive Poller gibt, verteilt Kafka Nachrichten an alle Konsumenten. Mit anderen Worten, Lambda erhält nicht alle Nachrichten zum Thema Kafka. Wenn Sie möchten, dass Lambda alle Nachrichten im Thema verarbeitet, deaktivieren Sie alle anderen Poller in dieser Konsumentengruppe.

Wenn Sie außerdem eine Konsumentengruppen-ID angeben und Kafka eine gültige vorhandene Konsumentengruppe mit derselben ID findet, ignoriert Lambda die StartingPosition-Parameter für Ihre Ereignisquellen-Zuweisung. Stattdessen beginnt Lambda mit der Verarbeitung von Datensätzen gemäß dem zugesagten Versatz der Konsumentengruppe. Wenn Sie eine Konsumentengruppen-ID angeben und Kafka keine vorhandene Konsumentengruppe finden kann, konfiguriert Lambda Ihre Ereignisquelle mit der angegebenen StartingPosition.

Die Konsumentengruppen-ID, die Sie angeben, muss unter all Ihren Kafka-Ereignisquellen eindeutig sein. Nachdem Sie eine Kafka-Ereignisquellenzuordnung mit der angegebenen Konsumentengruppen-ID erstellt haben, können Sie diesen Wert nicht aktualisieren.

Ausfallziele

Um Datensätze zu fehlgeschlagenen Aufrufen oder übergroßen Nutzlasten aus Ihrer Kafka-Ereignisquelle beizubehalten, konfigurieren Sie für Ihre Funktion ein Ziel für den Ausfall. Wenn ein Aufruf fehlschlägt, sendet Lambda einen JSON-Datensatz mit Details zum Aufruf an Ihr Ziel.

Sie können zwischen einem Amazon-SNS-Thema, einer Amazon-SQS-Warteschlange oder einem Amazon-S3-Bucket als Ihr Ziel wählen. Für die Zieltypen SNS-Thema oder SQS-Warteschlange sendet Lambda die Datensatzmetadaten an das Ziel. Für S3-Bucket-Ziele sendet Lambda den gesamten Aufrufdatensatz zusammen mit den Metadaten an das Ziel.

Damit Lambda erfolgreich Datensätze an das von Ihnen ausgewählte Ziel senden kann, stellen Sie sicher, dass die Ausführungsrolle Ihrer Funktion die relevanten Berechtigungen enthält. In der Tabelle wird auch beschrieben, wie jeder Zieltyp den JSON-Aufrufdatensatz empfängt.

Zieltyp Unterstützt für die folgenden Ereignisquellen Erforderliche Berechtigungen Zielspezifisches JSON-Format

Amazon-SQS-Warteschlange

  • Kinesis

  • DynamoDB

  • Selbstverwaltetes Apache Kafka und verwaltetes Apache Kafka

Lambda übergibt die Aufrufdatensatz-Metadaten als Message an das Ziel.

Amazon SNS-Thema

  • Kinesis

  • DynamoDB

  • Selbstverwaltetes Apache Kafka und verwaltetes Apache Kafka

Lambda übergibt die Aufrufdatensatz-Metadaten als Message an das Ziel.

Amazon S3-Bucket

  • Selbstverwaltetes Apache Kafka und verwaltetes Apache Kafka

Lambda speichert den Aufrufdatensatz zusammen mit den zugehörigen Metadaten am Ziel.

Tipp

Als bewährte Methode sollten Sie die nur für Ihre Ausführungsrolle erforderlichen Mindestberechtigungen angeben.

SNS- und SQS-Ziele

Das folgende Beispiel zeigt, was Lambda bei einem fehlgeschlagenen Aufruf der Kafka-Ereignisquelle an ein SNS-Thema oder eine SQS-Warteschlange sendet. Jeder der Schlüssel unter recordsInfo enthält sowohl das Kafka-Thema als auch die Kafka-Partition, getrennt durch einen Bindestrich. Bei dem Schlüssel "Topic-0" handelt es sich beispielsweise bei Topic um das Kafka-Thema und bei 0 um die Partition. Für jedes Thema und jede Partition können Sie die Offsets und Zeitstempeldaten verwenden, um die ursprünglichen Aufrufdatensätze zu finden.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

S3-Ziele

Für S3-Ziele sendet Lambda den gesamten Aufrufdatensatz zusammen mit den Metadaten an das Ziel. Das folgende Beispiel zeigt, was Lambda bei einem fehlgeschlagenen Aufruf der Kafka-Ereignisquelle an ein S3-Bucket-Ziel sendet. Zusätzlich zu allen Feldern aus dem vorherigen Beispiel für SQS- und SNS-Ziele enthält das Feld payload den ursprünglichen Aufrufdatensatz als maskierte JSON-Zeichenfolge.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
Tipp

Wir empfehlen außerdem, die S3-Versionsverwaltung in Ihrem Ziel-Bucket zu aktivieren.

Konfigurieren von Ausfallzielen

Gehen Sie folgendermaßen vor, um ein Ausfallziel mit der Konsole zu konfigurieren:

  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie eine Funktion aus.

  3. Wählen Sie unter Function overview (Funktionsübersicht) die Option Add destination (Ziel hinzufügen).

  4. Wählen Sie als Quelle die Option Aufruf der Zuordnung von Ereignisquellen aus.

  5. Wählen Sie für die Zuordnung von Ereignisquellen eine Ereignisquelle aus, die für diese Funktion konfiguriert ist.

  6. Wählen Sie für Bedingung die Option Bei Ausfall aus. Für Aufrufe zur Zuordnung von Ereignisquellen ist dies die einzig akzeptierte Bedingung.

  7. Wählen Sie unter Zieltyp den Zieltyp aus, an den Lambda Aufrufdatensätze sendet.

  8. Wählen Sie unter Destination (Ziel) eine Ressource aus.

  9. Wählen Sie Save aus.

Sie können mit der Lambda-API auch ein Ausfallziel konfigurieren. Der folgende CreateEventSourceMapping CLI-Befehl fügt beispielsweise einen SQS-Ausfall-Dsetinierung zu hinzuMyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Der folgende UpdateEventSourceMapping CLI-Befehl fügt der Kafka-Ereignisquelle, die der Eingabe zugeordnet ist, ein S3-Ziel bei Ausfall hinzuuuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Um ein Ziel zu entfernen, geben Sie eine leere Zeichenfolge als Argument für den destination-config-Parameter an:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Hinzufügen eines Amazon MSK-Auslösers (Konsole)

Befolgen Sie diese Schritte, um Ihren Amazon-MSK-Cluster und ein Kafka-Thema als Auslöser für Ihre Lambda-Funktion hinzuzufügen.

So fügen Sie Ihrer Lambda-Funktion (Konsole) einen Amazon-MSK-Auslöser hinzu
  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie den Namen Ihrer Lambda-Funktion aus.

  3. Wählen Sie unter Function overview (Funktionsübersicht) die Option Add trigger (Trigger hinzufügen).

  4. Führen Sie unter Auslöser-Konfiguration die folgenden Schritte aus:

    1. Wählen Sie den MSK-Auslösertyp.

    2. Wählen Sie für MSK-Cluster Ihren Cluster aus.

    3. Geben Sie für Batchgröße die maximale Anzahl von Nachrichten ein, die in einem einzelnen Batch empfangen werden sollen.

    4. Geben Sie für Batch window (Batch-Fenster) die maximale Zeit in Sekunden ein, die Lambda mit dem Sammeln von Datensätzen verbringt, bevor die Funktion aufgerufen wird.

    5. Geben Sie für Themenname den Namen eines Kafka-Themas ein.

    6. (Optional) Geben Sie für Konsumentengruppen-ID die ID einer Kafka-Konsumentengruppe ein, der Sie beitreten möchten.

    7. (Optional) Wählen Sie für Startposition die Option Neueste, um mit dem Lesen des Streams aus dem letzten Datensatz zu beginnen, Horizont trimmen, um mit dem frühesten verfügbaren Datensatz zu beginnen, oder Am Zeitstempel, um einen Zeitstempel anzugeben, ab dem mit dem Lesen begonnen werden soll.

    8. (Optional) Wählen Sie bei Authentication (Authentifizierung) den Geheimschlüssel zur Authentifizierung bei den Brokern in Ihrem MSK-Cluster aus.

    9. Um den Auslöser zu Testzwecken in einem deaktivierten Zustand zu erstellen (empfohlen), deaktivieren Sie Auslöser aktivieren. Um den Auslöser sofort zu aktivieren, wählen Sie Auslöser aktivieren.

  5. Wählen Sie hinzufügen aus, um den Auslöser zu erstellen.

Hinzufügen eines Amazon-MSK-Auslösers (AWS CLI)

Verwenden Sie die folgenden AWS CLI Beispielbefehle, um einen Amazon-MSK-Auslöser für Ihre Lambda-Funktion zu erstellen und anzuzeigen.

Erstellen eines Auslösers mit der AWS CLI

Beispiel – Erstellen Sie eine Zuordnung von Ereignisquellen für einen Cluster, der die IAM-Authentifizierung verwendet

Im folgenden Beispiel wird der create-event-source-mapping AWS CLI Befehl verwendet, um eine Lambda-Funktion namens einem Kafka-my-kafka-functionThema namens zuzuordnenAWSKafkaTopic. Die Ausgangsposition des Themas ist auf LATEST festgelegt. Wenn der Cluster die rollenbasierte IAM-Authentifizierung verwendet, benötigen Sie kein -SourceAccessConfigurationObjekt. Beispiel:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Beispiel – Erstellen Sie eine Zuordnung von Ereignisquellen für einen Cluster, der die SASL/SCRAM-Authentifizierung verwendet

Wenn der Cluster die SASL/SCRAM-Authentifizierung verwendet, müssen Sie ein -SourceAccessConfigurationObjekt, das angibt, SASL_SCRAM_512_AUTH und einen Secrets-Manager-Geheimnis-ARN einschließen.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
Beispiel – Erstellen Sie eine Zuordnung von Ereignisquellen für einen Cluster, der die mTLS-Authentifizierung verwendet

Wenn der Cluster die mTLS-Authentifizierung verwendet, müssen Sie ein -SourceAccessConfigurationObjekt, das angibt, CLIENT_CERTIFICATE_TLS_AUTH und einen Secrets-Manager-Geheimnis-ARN einschließen.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

Weitere Informationen finden Sie in der APICreateEventSourceMapping-Referenzdokumentation.

Anzeigen des Status mithilfe der AWS CLI

Im folgenden Beispiel wird der -get-event-source-mapping AWS CLI Befehl verwendet, um den Status der Ereignisquellenzuordnung zu beschreiben, die Sie erstellt haben.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Erstellen von kontenübergreifenden Zuordnungen von Ereignisquellen

Sie können private Multi-VPC-Konnektivität verwenden, um eine Lambda-Funktion mit einem bereitgestellten MSK-Cluster in einem anderen AWS-Konto zu verbinden. Multi-VPC-Konnektivität verwendet AWS PrivateLink, wodurch der gesamte Datenverkehr innerhalb des AWS Netzwerks bleibt.

Anmerkung

Sie können keine kontenübergreifenden Zuordnungen von Ereignisquellen für Serverless-MSK-Cluster erstellen.

Um eine kontenübergreifende Zuordnung von Ereignisquellen zu erstellen, müssen Sie zunächst die Multi-VPC-Konnektivität für den MSK-Cluster konfigurieren. Verwenden Sie beim Erstellen der Zuordnung von Ereignisquellen den ARN der verwalteten VPC-Verbindung anstelle des Cluster-ARNs, wie in den folgenden Beispielen gezeigt. Der CreateEventSourceMapping Vorgang unterscheidet sich auch je nachdem, welchen Authentifizierungstyp der MSK-Cluster verwendet.

Beispiel – Erstellen Sie eine kontoübergreifende Zuordnung von Ereignisquellen für einen Cluster, der die IAM-Authentifizierung verwendet

Wenn der Cluster die rollenbasierte IAM-Authentifizierung verwendet, benötigen Sie kein -SourceAccessConfigurationObjekt. Beispiel:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
Beispiel – Erstellen Sie eine kontoübergreifende Zuordnung von Ereignisquellen für einen Cluster, der die SASL/SCRAM-Authentifizierung verwendet

Wenn der Cluster die SASL/SCRAM-Authentifizierung verwendet, müssen Sie ein -SourceAccessConfigurationObjekt, das angibt, SASL_SCRAM_512_AUTH und einen Secrets-Manager-Geheimnis-ARN einschließen.

Es gibt zwei Möglichkeiten, Secrets für kontenübergreifende Zuordnungen von Amazon-MSK-Ereignisquellen mit SASL/SCRAM-Authentifizierung zu verwenden:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
Beispiel – Erstellen Sie eine kontoübergreifende Zuordnung von Ereignisquellen für einen Cluster, der die mTLS-Authentifizierung verwendet

Wenn der Cluster die mTLS-Authentifizierung verwendet, müssen Sie ein -SourceAccessConfigurationObjekt, das angibt, CLIENT_CERTIFICATE_TLS_AUTH und einen Secrets-Manager-Geheimnis-ARN einschließen. Das Secret kann im Clusterkonto oder im Lambda-Funktionskonto gespeichert werden.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Automatische Skalierung der Amazon-MSK-Ereignisquelle

Wenn Sie anfänglich eine Amazon-MSK-Ereignisquelle erstellen, weist Lambda einen Konsumenten zur Verarbeitung aller Partitionen im Kafka-Thema zu. Jeder Verbraucher hat mehrere Prozessoren, die parallel laufen, um erhöhte Workloads zu bewältigen. Lambda skaliert außerdem je nach Workload automatisch die Anzahl der Verbraucher nach oben oder unten. Um die Nachrichtenreihenfolge in jeder Partition beizubehalten, ist die maximale Anzahl von Verbrauchern pro Partition im Thema ein Verbraucher pro Partition.

In Intervallen von einer Minute wertet Lambda die Verbraucher-Offsetverzögerung aller Partitionen im Thema aus. Wenn die Verzögerung zu hoch ist, empfängt die Partition Nachrichten schneller als Lambda sie verarbeiten kann. Bei Bedarf fügt Lambda dem Thema Verbraucher hinzu oder entfernt sie. Der Skalierungsprozess zum Hinzufügen oder Entfernen von Verbrauchern erfolgt innerhalb von drei Minuten nach der Bewertung.

Wenn Ihre Lambda-Zielfunktion gedrosselt ist, verringert Lambda die Anzahl der Verbraucher. Diese Aktion reduziert die Workload für die Funktion, indem die Anzahl der Nachrichten reduziert wird, die Verbraucher abrufen und an die Funktion senden können.

Um den Durchsatz Ihres Kafka-Themas zu überwachen, sehen Sie sich die Offset-Verzögerungsmetrik an, die Lambda emittiert, während Ihre Funktion Datensätze verarbeitet.

Um zu überprüfen, wie viele Funktionsaufrufe parallel erfolgen, können Sie auch die Parallelitätsmetriken für Ihre Funktion überwachen.

Startpositionen für Abfragen und Streams

Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.

  • Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.

  • Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.

Dieses Verhalten bedeutet, dass, wenn Sie LATEST als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Startposition des Streams als TRIM_HORIZON oder AT_TIMESTAMP an.

Amazon- CloudWatch Metriken

Lambda gibt die Metrik OffsetLag aus, während Ihre Funktion Datensätze verarbeitet. Der Wert dieser Metrik ist die Differenz (der Versatz) zwischen dem letzten Datensatz, der ins Kafka-Ereignisquellen-Thema geschrieben wurde, und dem letzten Datensatz, den die Konsumentengruppe Ihrer Funktion verarbeitet hat. Sie können mit OffsetLag die Latenz zwischen dem Hinzufügen eines Datensatzes und der Verarbeitung des Datensatzes durch Ihre Konsumentengruppe abschätzen.

Ein zunehmender Trend in OffsetLag kann auf Probleme mit Pollern in der Konsumentengruppe Ihrer Funktion hinweisen. Weitere Informationen finden Sie unter Arbeiten mit Lambda-Funktionsmetriken.

Amazon-MSK-Konfigurationsparameter

Alle Lambda-Ereignisquellentypen haben dieselben - CreateEventSourceMapping und UpdateEventSourceMapping-API-Operationen. Allerdings gelten nur einige der Parameter für Amazon MSK.

Ereignisquellparameter, die für Amazon MSK gelten
Parameter Erforderlich Standard Hinweise

AmazonManagedKafkaEventSourceConfig

N

Enthält das ConsumerGroupId Feld , das standardmäßig einen eindeutigen Wert verwendet.

Kann nur auf „Erstellen“ festgelegt werden

BatchSize

N

100

Höchstwert: 10 000.

Enabled

N

Aktiviert

EventSourceArn

Y

Kann nur auf „Erstellen“ festgelegt werden

FunctionName

Y

FilterCriteria

N

Lambda-Ereignisfilterung

MaximumBatchingWindowInSeconds

N

500 ms

Batching-Verhalten

SourceAccessConfigurations

N

Keine Anmeldedaten

Anmeldeinformationen zur SASL/SCRAM- oder CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS)-Authentifizierung für Ihre Ereignisquelle

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, oder LATEST

Kann nur auf „Erstellen“ festgelegt werden

StartingPositionTimestamp

N

Erforderlich, wenn auf AT_TIMESTAMP gesetzt StartingPosition ist

Themen

Y

Kafka-Thema-Name

Kann nur auf „Erstellen“ festgelegt werden