Verwendung AWS Lambda mit Amazon Kinesis - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwendung AWS Lambda mit Amazon Kinesis

Anmerkung

Wenn Sie Daten an ein anderes Ziel als eine Lambda-Funktion senden oder die Daten vor dem Senden anreichern möchten, finden Sie weitere Informationen unter Amazon EventBridge Pipes.

Sie können eine AWS Lambda Funktion verwenden, um Datensätze in einem Amazon Kinesis Kinesis-Datenstream zu verarbeiten.

Ein Kinesis-Daten-Stream ist eine Gruppe von Shards. Jeder Shard enthält eine Sequenz von Datensätzen. Ein Konsument ist eine Anwendung, die die Daten aus einem Kinesis-Daten-Stream verarbeitet. Sie können eine Lambda-Funktion zu einem Konsumenten mit gemeinsam genutztem Durchsatz (Standard-Iterator) oder zu einem Konsumenten mit dediziertem Durchsatz mit erweitertem Rundsenden zuweisen.

Bei Standard-Iteratoren fragt Lambda jeden Shard in Ihrem Kinesis-Stream nach Datensätzen ab, die das HTTP-Protokoll verwenden. Die Ereignisquellenzuordnung teilt den Lesedurchsatz mit anderen Konsumenten des Shards zusammen.

Um die Latenz zu minimieren und den Lesedurchsatz zu maximieren, können Sie einen Daten-Stream-Konsumenten mit erweitertem Rundsenden erstellen. Stream-Konsumenten erhalten Sie eine dedizierte Verbindung für jeden Shard, der keine Auswirkungen auf andere Anwendungen hat, die aus dem Stream lesen. Der dedizierte Durchsatz ist hilfreich, wenn viele Anwendungen die gleichen Daten lesen oder wenn ein Stream mit großen Datensätzen verarbeitet wird. Kinesis überträgt Datensätze über HTTP/2 nach Lambda.

Weitere Informationen zu Kinesis-Datenströmen finden Sie unter Daten aus Amazon Kinesis Data Streams.

Beispielereignis

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

Abfragen und Stapeln von Streams

Lambda liest Datensätze aus dem Datenstrom und ruft Ihre Funktion synchron mit einem Ereignis auf, das Stream-Datensätze enthält. Lambda liest Datensätze in Batches und ruft Ihre Funktion auf, um Datensätze aus dem Batch zu verarbeiten. Jeder Batch enthält Datensätze aus einem einzelnen Shard/Datenstrom.

Standardmäßig ruft Lambda Ihre Funktion auf, sobald Datensätze verfügbar sind. Wenn der Batch, den Lambda aus der Ereignisquelle liest, nur einen Datensatz enthält, sendet Lambda nur einen Datensatz an die Funktion. Damit die Funktion nicht mit einer kleinen Anzahl von Datensätzen aufgerufen wird, können Sie die Ereignisquelle anweisen, Datensätze bis zu 5 Minuten lang zu puffern, indem Sie ein Batch-Fenster konfigurieren. Bevor die Funktion aufgerufen wird, liest Lambda so lange Datensätze aus der Ereignisquelle, bis es einen vollständigen Batch erfasst hat, das Batch-Verarbeitungsfenster abläuft oder der Batch die Nutzlastgrenze von 6 MB erreicht. Weitere Informationen finden Sie unter Batching-Verhalten.

Warnung

Lambda-Ereignisquellenzuordnungen verarbeiten jedes Ereignis mindestens einmal, und es kann zu einer doppelten Verarbeitung von Datensätzen kommen. Um mögliche Probleme im Zusammenhang mit doppelten Ereignissen zu vermeiden, empfehlen wir Ihnen dringend, Ihren Funktionscode idempotent zu machen. Weitere Informationen finden Sie im Knowledge Center unter Wie mache ich meine Lambda-Funktion idempotent?. AWS

Wenn Ihre Funktion einen Fehler zurückgibt, wiederholt Lambda den Vorgang mit dem Batch, bis die Verarbeitung erfolgreich ist oder die Daten ablaufen. Um unterbrochenen Shards zu vermeiden, können Sie die Ereignisquellenzuordnung so konfigurieren, dass es mit einer kleineren Batchgröße wiederholt wird, die Anzahl der Wiederholungen beschränkt wird oder Datensätze verworfen werden, die zu alt sind. Um verworfene Ereignisse beizubehalten, können Sie die Zuordnung von Ereignisquellen so konfigurieren, dass Details zu fehlgeschlagenen Batches an eine SQS-Standardwarteschlange oder ein SNS-Standardthema gesendet werden.

Um die Nebenläufigkeit zu erhöhen, können Sie mehrere Batches aus jedem Shard parallel verarbeiten. Lambda kann bis zu 10 Batches in jedem Shard gleichzeitig verarbeiten. Wenn Sie die Anzahl gleichzeitiger Batches pro Shard erhöhen, stellt Lambda weiterhin die Auftragsverarbeitung auf Partitionsschlüsselebene sicher.

Konfigurieren Sie die ParallelizationFactorEinstellung so, dass ein Shard eines Kinesis- oder DynamoDB-Datenstroms mit mehr als einem Lambda-Aufruf gleichzeitig verarbeitet wird. Sie können die Anzahl der gleichzeitigen Batches angeben, die Lambda von einem Shard über einen Parallelisierungsfaktor von 1 (Standard) bis 10 abfragt. Wenn ParallelizationFactor beispielsweise auf 2 gesetzt ist, können Sie maximal 200 gleichzeitige Lambda-Aufrufe haben, um 100 Kinesis-Daten-Shards zu verarbeiten (in der Praxis werden womöglich andere Werte für die Metrik ConcurrentExecutions angezeigt). Dies hilft, den Verarbeitungsdurchsatz hochzuskalieren, wenn das Datenvolumen flüchtig ist und IteratorAge hoch ist.

Sie können die Aggregation auch ParallelizationFactor mit Kinesis verwenden. Das Verhalten der Ereignisquellenzuordnung hängt davon ab, ob Sie das erweiterte Fan-Out verwenden:

  • Ohne erweitertes Fan-Out: Alle Ereignisse innerhalb eines aggregierten Ereignisses müssen denselben Partitionsschlüssel haben. Der Partitionsschlüssel muss außerdem mit dem des aggregierten Ereignisses übereinstimmen. Wenn die Ereignisse innerhalb des aggregierten Ereignisses unterschiedliche Partitionsschlüssel haben, kann Lambda nicht garantieren, dass die Ereignisse in der richtigen Reihenfolge nach Partitionsschlüsseln verarbeitet werden.

  • Mit verbessertem Fan-Out: Zunächst dekodiert Lambda das aggregierte Ereignis in seine einzelnen Ereignisse. Das aggregierte Ereignis kann einen anderen Partitionsschlüssel haben als die darin enthaltenen Ereignisse. Ereignisse, die nicht dem Partitionsschlüssel entsprechen, werden jedoch gelöscht und gehen verloren. Lambda verarbeitet diese Ereignisse nicht und sendet sie nicht an ein konfiguriertes Fehlerziel.

Abfrage und Startposition des Streams

Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.

  • Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.

  • Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.

Dieses Verhalten bedeutet, dass, wenn Sie LATEST als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Startposition des Streams als TRIM_HORIZON oder AT_TIMESTAMP an.

Konfigurieren Ihres Daten-Streams und Ihrer Funktion

Ihre Lambda-Funktion ist eine Konsumentenanwendung für Ihren Daten-Stream. Sie verarbeitet jeweils einen Batch Datensätzen aus jedem Shard. Sie können eine Lambda-Funktion zu einem Daten-Stream (Standard-Iterator) oder zu einem Konsumenten eines Streams (erweitertes Rundsenden) zuweisen.

Für Standard-Iteratoren fragt Lambda jeden Shard in Ihrem Kinesis-Stream nach Datensätzen ab, bei einer Basisrate von einmal pro Sekunde. Wenn mehr Datensätze verfügbar sind, verarbeitet Lambda Batches, bis die Funktion mit dem Stream gleichzieht. Die Ereignisquellenzuordnung teilt den Lesedurchsatz mit anderen Konsumenten des Shards zusammen.

Um die Latenz zu minimieren und den Lesedurchsatz zu maximieren, erstellen Sie einen Daten-Stream-Konsumenten mit erweitertem Rundsenden. Stream-Konsumenten mit erweitertem Rundsenden erhalten eine dedizierte Verbindung für jeden Shard, der keine Auswirkungen auf andere Anwendungen hat, die aus dem Stream lesen. Stream-Konsumenten verwenden HTTP/2, um die Latenz zu reduzieren, indem Datensätze über eine langlebige Verbindung an Lambda übertragen und Anforderungs-Header komprimiert werden. Sie können mit der RegisterStreamConsumerKinesis-API einen Stream-Consumer erstellen.

aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Die Ausgabe sollte folgendermaßen aussehen:

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

Um die Geschwindigkeit zu erhöhen, mit der Ihre Funktion Datensätze verarbeitet, fügen Sie dem Daten-Stream Shards hinzu. Lambda verarbeitet Datensätze in jedem Shard in der Reihenfolge. Es beendet die Verarbeitung zusätzlicher Datensätze in einem Shard, wenn Ihre Funktion einen Fehler zurückgibt. Mehr Shards bedeutet, dass mehr Stapel verarbeitet und gleichzeitig die Auswirkungen von Fehlern auf die Nebenläufigkeit verringert werden.

Wenn Ihre Funktion nicht hochskalieren kann, um alle gleichzeitigen Stapel zu verarbeiten, fordern Sie eine Kontingenterhöhung an oder reservieren Sie Gleichzeitigkeit für Ihre Funktion.

Erstellen Sie eine Ereignisquellenzuordnung, um eine Lambda-Funktion aufzurufen

Um Ihre Lambda-Funktion mit Datensätzen aus Ihrem Datenstrom aufzurufen, erstellen Sie eine Ereignisquellenzuordnung. Sie können mehrere Ereignisquellenzuordnungen erstellen, um gleiche Daten mit mehreren Lambda-Funktionen oder Elemente aus mehreren Daten-Streams mit nur einer Funktion zu verarbeiten. Bei der Verarbeitung von Elementen aus mehreren Streams enthält jeder Batch nur Datensätze aus einem einzigen Shard oder Stream.

Sie können Zuordnungen von Ereignisquellen konfigurieren, um Datensätze aus einem Stream in einem anderen zu verarbeiten. AWS-Konto Weitere Informationen hierzu finden Sie unter Erstellen einer kontenübergreifenden Zuordnung von Ereignisquellen.

Bevor Sie eine Ereignisquellenzuordnung erstellen, müssen Sie Ihrer Lambda-Funktion die Erlaubnis erteilen, aus einem Kinesis-Datenstream zu lesen. Lambda benötigt die folgenden Berechtigungen, um Ressourcen im Zusammenhang mit Ihrem Kinesis-Datenstrom zu verwalten:

Die AWS verwaltete Richtlinie AWSLambdaKinesisExecutionRoleumfasst diese Berechtigungen. Fügen Sie diese verwaltete Richtlinie zu Ihrer Funktion hinzu, wie im folgenden Verfahren beschrieben.

AWS Management Console
So fügen Sie Ihrer Funktion Kinesis-Berechtigungen hinzu
  1. Öffnen Sie die Funktionsseite der Lambda-Konsole und wählen Sie Ihre Funktion aus.

  2. Wählen Sie auf der Registerkarte Konfiguration die Option Berechtigungen aus.

  3. Wählen Sie im Bereich Ausführungsrolle unter Rollenname den Link zur Ausführungsrolle Ihrer Funktion aus. Dieser Link öffnet die Seite für diese Rolle in der IAM-Konsole.

  4. Wählen Sie im Bereich „Berechtigungsrichtlinien“ die Option „Berechtigungen hinzufügen“ und anschließend „Richtlinien anhängen“ aus.

  5. Geben Sie im Suchfeld AWSLambdaKinesisExecutionRole ein.

  6. Aktivieren Sie das Kontrollkästchen neben der Richtlinie und wählen Sie „Berechtigung hinzufügen“ aus.

AWS CLI
So fügen Sie Ihrer Funktion Kinesis-Berechtigungen hinzu
  • Führen Sie den folgenden CLI-Befehl aus, um die AWSLambdaKinesisExecutionRole Richtlinie zur Ausführungsrolle Ihrer Funktion hinzuzufügen:

    aws iam attach-role-policy --role-name MyFunctionRole \ --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
AWS SAM
So fügen Sie Ihrer Funktion Kinesis-Berechtigungen hinzu
  • Fügen Sie der Definition Ihrer Funktion die Policies Eigenschaft hinzu, wie im folgenden Beispiel gezeigt:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole

Nachdem Sie die erforderlichen Berechtigungen konfiguriert haben, erstellen Sie die Zuordnung der Ereignisquelle.

AWS Management Console
So erstellen Sie das Kinesis-Ereignisquellen-Mapping
  1. Öffnen Sie die Funktionsseite der Lambda-Konsole und wählen Sie Ihre Funktion aus.

  2. Wählen Sie im Bereich Function overview (Funktionsübersicht) die Option Add trigger (Auslöser hinzufügen).

  3. Wählen Sie unter Trigger-Konfiguration für die Quelle Kinesis aus.

  4. Wählen Sie den Kinesis-Stream aus, für den Sie das Event-Quellen-Mapping erstellen möchten, und optional einen Consumer Ihres Streams.

  5. (Optional) Bearbeiten Sie die Stapelgröße, die Startposition und das Stapelfenster für Ihre Ereignisquellenzuordnung.

  6. Wählen Sie Hinzufügen aus.

Wenn Sie Ihre Ereignisquellenzuordnung von der Konsole aus erstellen, muss Ihre IAM-Rolle über die Berechtigungen kinesis: ListStreams und kinesis: verfügen. ListStreamConsumers

Wenn Sie die Zuordnung der Ereignisquellen nicht erstellen können, wenden Sie sich an Ihren Systemadministrator, um zu überprüfen, ob Sie über die erforderlichen Berechtigungen verfügen.

AWS CLI
So erstellen Sie das Kinesis-Ereignisquellen-Mapping
  • Führen Sie den folgenden CLI-Befehl aus, um eine Kinesis-Ereignisquellenzuordnung zu erstellen. Wählen Sie Ihre eigene Batchgröße und Startposition entsprechend Ihrem Anwendungsfall.

    aws lambda create-event-source-mapping --function-name MyFunction \ --event-source-arn arn:aws:kinesis:us-west-2:123456789012:stream/MyKinesisStream \ --starting-position LATEST --batch-size 100

Um zusätzlich ein Batching-Fenster anzugeben, fügen Sie die --maximum-batching-window-in-seconds Option hinzu. Weitere Informationen zur Verwendung dieses CLI-Befehls finden Sie create-event-source-mappingin der AWS CLI Befehlsreferenz.

AWS SAM
So erstellen Sie das Kinesis-Ereignisquellen-Mapping
  • Fügen Sie der Definition Ihrer Funktion die KinesisEvent Eigenschaft hinzu, wie im folgenden Beispiel gezeigt:

    Resources: MyFunction: Type: AWS::Serverless::Function Properties: CodeUri: ./my-function/ Handler: index.handler Runtime: nodejs20.x Policies: - AWSLambdaKinesisExecutionRole Events: KinesisEvent: Type: Kinesis Properties: Stream: !GetAtt MyKinesisStream.Arn StartingPosition: LATEST BatchSize: 100 MyKinesisStream: Type: AWS::Kinesis::Stream Properties: ShardCount: 1

Weitere Informationen zum Erstellen einer Ereignisquellenzuordnung für Kinesis finden Sie unter Kinesis im AWS Serverless Application Model Entwicklerhandbuch. AWS SAM

Erstellen einer kontenübergreifenden Zuordnung von Ereignisquellen

Amazon Kinesis Data Streams unterstützt ressourcenbasierte Richtlinien. Aus diesem Grund können Sie in einen Stream aufgenommene Daten in einem Konto AWS-Konto mit einer Lambda-Funktion in einem anderen Konto verarbeiten.

Um eine Ereignisquellenzuordnung für Ihre Lambda-Funktion mithilfe eines Kinesis-Streams in einem anderen zu erstellen AWS-Konto, müssen Sie den Stream mithilfe einer ressourcenbasierten Richtlinie konfigurieren, um Ihrer Lambda-Funktion die Berechtigung zum Lesen von Elementen zu erteilen. Informationen dazu, wie Sie Ihren Stream so konfigurieren, dass er kontoübergreifenden Zugriff ermöglicht, finden Sie unter Zugriff mit kontoübergreifenden AWS Lambda Funktionen teilen im Amazon Kinesis Streams Streams-Entwicklerhandbuch.

Nachdem Sie Ihren Stream mit einer ressourcenbasierten Richtlinie konfiguriert haben, die Ihrer Lambda-Funktion die erforderlichen Berechtigungen erteilt, erstellen Sie die Ereignisquellenzuordnung mit einer der im vorherigen Abschnitt beschriebenen Methoden.

Wenn Sie Ihr Event-Quellen-Mapping mit der Lambda-Konsole erstellen möchten, fügen Sie den ARN Ihres Streams direkt in das Eingabefeld ein. Wenn Sie einen Verbraucher für Ihren Stream angeben möchten, wird das Stream-Feld durch Einfügen des ARN des Verbrauchers automatisch aufgefüllt.

Optionen für die Ereignisquelle

Lambda unterstützt die folgenden Optionen für Kinesis-Ereignisquellen:

Optionen für die Ereignisquelle
  • Kinesis-Stream Der Kinesis-Stream, aus dem Datensätze gelesen werden sollen.

  • Konsument (optional) – Verwenden Sie einen Stream-Konsumenten, um über eine dedizierte Verbindung aus dem Stream zu lesen.

  • Batchgröße – Die Anzahl der Datensätze, die in jedem Batch bis zu 10.000 an die Funktion gesendet werden sollen. Lambda übergibt alle Datensätze im Batch in einem einzigen Aufruf an die Funktion, solange die Gesamtgröße der Ereignisse nicht das Nutzlast-Limit für synchrone Aufrufe überschreitet (6 MB).

  • Batchfenster – Geben Sie die maximale Zeitspanne zur Erfassung von Datensätzen vor dem Aufruf der Funktion in Sekunden an.

  • Startposition – Verarbeiten Sie nur neue Datensätze, alle vorhandenen Datensätze oder Datensätze, die nach einem bestimmten Datum erstellt wurden.

    • Neueste – Verarbeiten Sie Datensätze, die neu zum Stream hinzugefügt wurden.

    • Horizont trimmen – Verarbeiten Sie alle Datensätze im Stream.

    • Am Zeitstempel – Verarbeiten Sie Datensätze ab einem bestimmten Zeitpunkt.

    Nach der Verarbeitung aller vorhandenen Datensätze hat die Funktion aufgeholt und setzt die Verarbeitung neuer Datensätze fort.

  • On-failure destination (Ziel bei Ausfall) – Eine SQS-Warteschlange oder ein SNS-Thema für Datensätze, die nicht verarbeitet werden können. Wenn Lambda einen Datensatz-Batch verwirft, weil er zu alt ist oder alle Wiederholungen erschöpft hat, sendet es Details zum Batch an die Warteschlange oder das Thema.

  • Wiederholungsversuche – Die maximale Anzahl von Wiederholungen von Lambda, wenn die Funktion einen Fehler zurückgibt. Dies gilt nicht für Servicefehler oder Drosselungen, bei denen der Batch die Funktion nicht erreicht hat.

  • Höchstalter des Datensatzes – Das maximale Alter eines Datensatzes, den Lambda an Ihre Funktion sendet.

  • Batch bei Fehler aufteilen – Wenn die Funktion einen Fehler zurückgibt, teilen Sie den Batch vor dem erneuten Versuch in zwei Teile. Ihre ursprüngliche Einstellung für die Batch-Größe bleibt unverändert.

  • Gleichzeitige Batches pro Shard – Verarbeitet gleichzeitig mehrere Batches aus demselben Shard.

  • Aktiviert – Auf „true“ festlegen, um die Ereignisquellenzuordnung zu aktivieren. Auf "false" festlegen, um die Verarbeitung von Datensätzen zu beenden. Lambda merkt sich den zuletzt verarbeiteten Datensatz und setzt die Verarbeitung nach erneuter Aktivierung an dieser Stelle fort.

Anmerkung

Kinesis berechnet Gebühren für jeden Shard, sowie bei verbessertem Rundsenden für Daten, die aus dem Stream gelesen werden. Details zu den Preisen finden Sie unter Amazon-Kinesis- Preise.

Um die Konfiguration der Ereignisquelle zu einem späteren Zeitpunkt zu verwalten, wählen Sie den Auslöser im Designer aus.

Filtern von Kinesis-Ereignissen

Wenn Sie Kinesis als Ereignisquelle für Lambda konfigurieren, können Sie mithilfe von Ereignisfilterung steuern, welche Datensätze von Ihrem Stream Lambda zur Verarbeitung an Ihre Funktion sendet. Weitere Informationen zur Verwendung der Lambda-Ereignisfilterung mit Kinesis finden Sie unter Filtern mit Kinesis.

API für die Ereignisquellenzuordnung

Um eine Ereignisquelle mit der AWS Command Line Interface (AWS CLI) oder einem AWS-SDK zu verwalten, können Sie die folgenden API-Operationen verwenden:

Verwenden Sie den Befehl, um die Zuordnung der Ereignisquelle mit dem AWS CLI zu erstellen. create-event-source-mapping Im folgenden Beispiel wird die verwendet AWS CLI , um eine benannte Funktion einem Kinesis-Datenstrom my-function zuzuordnen. Der Daten-Stream wird mit dem Amazon-Ressourcennamen (ARN) angegeben, mit einer Stapelgröße von 500, beginnend ab dem Zeitstempel in Unix-Zeit.

aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Die Ausgabe sollte folgendermaßen aussehen:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Um einen Konsumenten zu verwenden, geben Sie den ARN des Konsumenten anstelle des ARN des Streams an.

Konfigurieren Sie zusätzliche Optionen, um die Verarbeitung von Batches anzupassen und um anzugeben, wann nicht verarbeitete Datensätze verworfen werden sollen. Im folgenden Beispiel wird eine Ereignisquellenzuordnung aktualisiert, um nach zwei Wiederholungsversuchen einen Fehlerdatensatz an eine SQS-Standardwarteschlange zu senden oder wenn die Datensätze älter als eine Stunde sind.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

Sie sollten diese Ausgabe sehen:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Aktualisierte Einstellungen werden asynchron angewendet und werden erst nach Abschluss des Vorgangs in der Ausgabe berücksichtigt. Verwenden Sie den get-event-source-mapping-Befehl, um den aktuellen Status anzuzeigen.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

Sie sollten diese Ausgabe sehen:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Um mehrere Batches gleichzeitig zu verarbeiten, verwenden Sie die --parallelization-factor-Option.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Fehlerbehandlung

Die Ereignisquellenzuordnung, die Datensätze aus Ihrem Kinesis-Stream liest, ruft Ihre Funktion synchron auf und versucht es bei Fehlern erneut. Wenn Lambda die Funktion drosselt oder einen Fehler zurückgibt, ohne die Funktion aufzurufen, versucht es Lambda erneut, bis die Datensätze ablaufen oder das maximale Alter überschreiten, das Sie in der Ereignisquellenzuordnung konfigurieren.

Wenn die Funktion die Datensätze empfängt, aber einen Fehler zurückgibt, versucht Lambda so lange, bis die Datensätze im Batch ablaufen, das maximale Alter überschreiten oder das konfigurierte Wiederholungskontingent erreicht haben. Bei Funktionsfehlern können Sie auch die Ereignisquellenzuordnung so konfigurieren, dass ein fehlgeschlagener Batch in zwei Batches aufgeteilt wird. Bei einem erneuten Versuch mit kleineren Batches werden fehlerhafte Datensätze isoliert und Zeitüberschreitungsprobleme umgangen. Das Teilen eines Batches wird nicht auf das Wiederholungskontingent angerechnet.

Wenn die Fehlerbehandlungsmaßnahmen fehlschlagen, verwirft Lambda die Datensätze und setzt die Verarbeitung von Batches aus dem Stream fort. Bei den Standardeinstellungen bedeutet dies, dass ein fehlerhafter Datensatz die Verarbeitung auf dem betroffenen Shard für bis zu eine Woche blockieren kann. Um dies zu vermeiden, konfigurieren Sie die Ereignisquellenzuordnung Ihrer Funktion mit einer angemessenen Anzahl von Wiederholungen und einem maximalen Datensatzalter, das zu Ihrem Anwendungsfall passt.

Um eine Aufzeichnung verworfener Batches beizubehalten, konfigurieren Sie ein Ziel für fehlgeschlagene Ereignisse. Lambda sendet ein Dokument mit Details zum Batch an die Zielwarteschlange oder das Zielthema.

So konfigurieren Sie ein Ziel für Datensätze mit fehlerhaften Ereignissen
  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie eine Funktion aus.

  3. Wählen Sie unter Function overview (Funktionsübersicht) die Option Add destination (Ziel hinzufügen).

  4. Wählen Sie in Source (Quelle) die Option Stream invocation (Stream-Aufruf) aus.

  5. Wählen Sie in Stream (Stream) einen der Funktion zugeordneten Stream aus.

  6. Wählen Sie unter Destination type (Zieltyp) den Ressourcentyp aus, der den Aufrufdatensatz empfängt.

  7. Wählen Sie unter Destination (Ziel) eine Ressource aus.

  8. Wählen Sie Save aus.

Das folgende Beispiel zeigt einen Aufrufdatensatz für einen Kinesis-Stream.

Beispiel Aufrufdatensatzglauben
{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Sie können diese Informationen verwenden, um die betroffenen Datensätze aus dem Stream für die Fehlersuche abzurufen. Die tatsächlichen Datensätze sind nicht enthalten, daher müssen Sie diesen Datensatz verarbeiten und aus dem Stream abrufen, bevor sie ablaufen und verloren gehen.

CloudWatch Amazon-Metriken

Lambda gibt die IteratorAge-Metrik aus, wenn Ihre Funktion die Verarbeitung eines Batches von Datensätzen fertigstellt. Die Metrik gibt an, wie alt der letzte Datensatz im Batch bei Fertigstellung der Verarbeitung war. Wenn Ihre Funktion neue Ereignisse verarbeitet, können Sie mit dem Iterator-Alter die Latenz zwischen dem Zeitpunkt, zu dem ein Datensatz hinzugefügt wird und dem Zeitpunkt, zu dem die Funktion verarbeitet wird, schätzen.

Eine steigende Tendenz beim Iterator-Alter kann auf Probleme mit Ihrer Funktion hindeuten. Weitere Informationen finden Sie unter Arbeiten mit Lambda-Funktionsmetriken.

Zeitfenster

Lambda-Funktionen können kontinuierliche Stream-Verarbeitungsanwendungen ausführen. Ein Stream entspricht einer unbegrenzten Menge von Daten, die kontinuierlich durch Ihre Anwendung fließen. Um Informationen aus dieser sich ständig aktualisierenden Eingabe zu analysieren, können Sie die enthaltenen Datensätze mithilfe eines zeitlich definierten Fensters binden.

Rollierende Fenster sind unterschiedliche Zeitfenster, die sich in regelmäßigen Abständen öffnen und schließen. Standardmäßig sind Lambda-Aufrufe zustandslos – Sie können sie nicht für die Verarbeitung von Daten über mehrere kontinuierliche Aufrufe hinweg ohne eine externe Datenbank verwenden. Mit rollierenden Fenstern können Sie jedoch Ihren Status über Aufrufe hinweg beibehalten. Dieser Zustand enthält das Gesamtergebnis der Nachrichten, die zuvor für das aktuelle Fenster verarbeitet wurden. Ihr Zustand kann maximal 1 MB pro Shard betragen. Wenn er diese Größe überschreitet, wird Lambda das Fenster vorzeitig beenden.

Jeder Datensatz in einem Stream gehört zu einem bestimmten Fenster. Lambda verarbeitet jeden Datensatz mindestens einmal, garantiert jedoch nicht, dass jeder Datensatz nur einmal verarbeitet wird. In seltenen Fällen, etwa bei der Fehlerbehandlung, werden einige Datensätze möglicherweise mehrmals verarbeitet. Datensätze werden beim ersten Mal immer in der richtigen Reihenfolge verarbeitet. Wenn Datensätze mehr als einmal verarbeitet werden, werden sie nicht in der richtigen Reihenfolge verarbeitet.

Aggregation und Verarbeitung

Ihre benutzerverwaltete Funktion wird sowohl zur Aggregation als auch zur Verarbeitung der Endergebnisse dieser Aggregation aufgerufen. Lambda aggregiert alle im Fenster empfangenen Datensätze. Sie können diese Datensätze in mehreren Stapeln erhalten, jeweils als ein separater Aufruf. Jeder Aufruf erhält einen Zustand. Wenn Sie also rollierende Fenster verwenden, muss Ihre Lambda-Funktionsantwort eine state-Eigenschaft enthalten. Wenn die Antwort keine state-Eigenschaft enthält, betrachtet Lambda dies als fehlgeschlagenen Aufruf. Um diese Bedingung zu erfüllen, kann Ihre Funktion ein TimeWindowEventResponse-Objekt zurückgeben, das die folgende JSON-Form aufweist:

Beispiel TimeWindowEventResponse-Werte
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Anmerkung

Für Java-Funktionen empfehlen wir, eine Map<String, String> zu verwenden, um den Status darzustellen.

Am Ende des Fensters wird das Flag isFinalInvokeForWindow auf true gesetzt, um anzugeben, dass es sich um den Endzustand handelt und dass es für die Verarbeitung bereit ist. Nach der Verarbeitung werden das Fenster und Ihr endgültiger Aufruf wird abgeschlossen, und dann wird der Zustand gelöscht.

Am Ende Ihres Fensters verwendet Lambda die endgültige Verarbeitung für Aktionen an den Aggregationsergebnissen. Ihre endgültige Verarbeitung wird synchron aufgerufen. Nach erfolgreichem Aufruf zeigt Ihre Funktion auf die Sequenznummer und die Stream-Verarbeitung wird fortgesetzt. Wenn der Aufruf nicht erfolgreich ist, unterbricht Ihre Lambda-Funktion die weitere Verarbeitung bis zu einem erfolgreichen Aufruf.

Beispiel KinesisTimeWindowEvent
{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Konfiguration

Sie können rollierende Fenster konfigurieren, wenn Sie eine Ereignisquellenzuordnung erstellen oder aktualisieren. Um ein rollierendes Fenster zu konfigurieren, geben Sie das Fenster in Sekunden an. Der folgende Befehl example AWS Command Line Interface (AWS CLI) erstellt eine Quellenzuordnung für Streaming-Ereignisse mit einem Zeitfenster von 120 Sekunden. Die für Aggregation und Verarbeitung definierte Lambda-Funktion wird tumbling-window-example-function genannt.

aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda bestimmt die rollierenden Fenstergrenzen basierend auf dem Zeitpunkt, zu dem Datensätze in den Stream eingefügt wurden. Für alle Datensätze steht ein ungefährer Zeitstempel zur Verfügung, den Lambda in Grenzbestimmungen verwendet.

Rollierende Fensteraggregationen unterstützen kein Resharding. Wenn der Shard endet, betrachtet Lambda das Fenster als geschlossen und die untergeordneten Shards beginnen ihr eigenes Fenster in einem neuen Zustand.

Rollierende Fenster unterstützen vollständig die bestehenden Wiederholungsrichtlinien maxRetryAttempts und maxRecordAge.

Beispiel Handler.py – Aggregation und Verarbeitung

Die folgende Python-Funktion veranschaulicht, wie Sie Ihren Endzustand aggregieren und dann verarbeiten:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}

Melden von Batch-Elementen

Beim Konsumieren und Verarbeiten von Streaming-Daten aus einer Ereignisquelle werden standardmäßig Lambda-Checkpoints auf die höchste Sequenznummer eines Batches nur dann überprüft, wenn der Batch ein voller Erfolg ist. Lambda behandelt alle anderen Ergebnisse als einen vollständigen Fehler und versucht, den Batch bis zum Wiederholungslimit zu verarbeiten. Um beim Verarbeiten von Stapeln aus einem Stream Teilerfolge zu ermöglichen, aktivieren Sie ReportBatchItemFailures. Das Zulassen von Teilerfolgen kann dazu beitragen, die Anzahl der Wiederholungen in einer Aufzeichnung zu reduzieren, obwohl die Möglichkeit von Wiederholungen in einer erfolgreichen Aufzeichnung nicht vollständig verhindert wird.

Zum Aktivieren von ReportBatchItemFailures fügen Sie den Enum-Wert ReportBatchItemFailures der FunctionResponseTypes-Liste hinzu. Diese Liste zeigt an, welche Antworttypen für Ihre Funktion aktiviert sind. Sie können diese Liste konfigurieren, wenn Sie eine Ereignisquellenzuordnung erstellen oder aktualisieren.

Berichtsyntax

Beim Konfigurieren von Berichten zu Batch-Elementfehlern wird die StreamsEventResponse-Klasse mit einer Liste von Batch-Elementfehlern zurückgegeben. Sie können ein StreamsEventResponse-Objekt verwenden, um die Sequenznummer des ersten fehlgeschlagenen Datensatzes im Batch zurückzugeben. Sie können auch Ihre eigene benutzerdefinierte Klasse mit der richtigen Antwortsyntax erstellen. Die folgende JSON-Struktur zeigt die erforderliche Antwortsyntax:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
Anmerkung

Wenn das batchItemFailures-Array mehrere Elemente enthält, verwendet Lambda den Datensatz mit der niedrigsten Sequenznummer als Kontrollpunkt. Lambda wiederholt dann alle Datensätze ab diesem Kontrollpunkt.

Erfolgs- und Misserfolgsbedingungen

Lambda behandelt einen Batch als vollständigen Erfolg, wenn Sie eines der folgenden Elemente zurückgeben:

  • Eine leere batchItemFailure-Liste

  • Eine ungültige batchItemFailure-Liste

  • Ein leeres EventResponse

  • Ein ungültiges EventResponse

Lambda behandelt einen Batch als vollständigen Misserfolg, wenn Sie eines der folgenden Elemente zurückgeben:

  • Eine leere Zeichenfolge itemIdentifier

  • Ein ungültiges itemIdentifier

  • Ein itemIdentifier mit einem falschen Schlüsselnamen

Lambda wiederholt Fehler basierend auf Ihrer Wiederholungsstrategie.

Einen Batch halbieren

Wenn Ihr Aufruf fehlschlägt und BisectBatchOnFunctionError eingeschaltet ist, wird der Stapel unabhängig von Ihrer ReportBatchItemFailures-Einstellung halbiert.

Wenn eine partielle Batch-Erfolgsantwort empfangen wird und sowohl BisectBatchOnFunctionError als auch ReportBatchItemFailures aktiviert sind, wird der Batch mit der zurückgegebenen Sequenznummer halbiert und Lambda versucht nur die verbleibenden Datensätze erneut.

Hier sind einige Beispiele für Funktionscodes, die die Liste der fehlgeschlagenen Nachrichten-IDs im Batch zurückgeben:

.NET
AWS SDK for .NET
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }
Go
SDK für Go V2
Anmerkung

Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern Kinesis Kinesis-Batch-Artikeln mit Lambda mithilfe von Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, record := range kinesisEvent.Records { curRecordSequenceNumber := "" // Process your record if /* Your record processing condition here */ { curRecordSequenceNumber = record.Kinesis.SequenceNumber } // Add a condition to check if the record processing failed if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber}) } } kinesisBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return kinesisBatchResponse, nil } func main() { lambda.Start(handler) }
Java
SDK für Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> { @Override public StreamsEventResponse handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
JavaScript
SDK für JavaScript (v3)
Anmerkung

Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Javascript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { for (const record of event.Records) { try { console.log(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); console.log(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { console.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } console.log(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync(payload) { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }

Melden von Fehlern Kinesis Kinesis-Batch-Elementen mit Lambda unter Verwendung von. TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { KinesisStreamEvent, Context, KinesisStreamHandler, KinesisStreamRecordPayload, KinesisStreamBatchResponse, } from "aws-lambda"; import { Buffer } from "buffer"; import { Logger } from "@aws-lambda-powertools/logger"; const logger = new Logger({ logLevel: "INFO", serviceName: "kinesis-stream-handler-sample", }); export const functionHandler: KinesisStreamHandler = async ( event: KinesisStreamEvent, context: Context ): Promise<KinesisStreamBatchResponse> => { for (const record of event.Records) { try { logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`); const recordData = await getRecordDataAsync(record.kinesis); logger.info(`Record Data: ${recordData}`); // TODO: Do interesting work based on the new data } catch (err) { logger.error(`An error occurred ${err}`); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return { batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }], }; } } logger.info(`Successfully processed ${event.Records.length} records.`); return { batchItemFailures: [] }; }; async function getRecordDataAsync( payload: KinesisStreamRecordPayload ): Promise<string> { var data = Buffer.from(payload.data, "base64").toString("utf-8"); await Promise.resolve(1); //Placeholder for actual async work return data; }
PHP
SDK für PHP
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern Kinesis Kinesis-Batch-Elementen mit Lambda mithilfe von PHP.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\Kinesis\KinesisEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $kinesisEvent = new KinesisEvent($event); $this->logger->info("Processing records"); $records = $kinesisEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK für Python (Boto3)
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern bei Kinesis-Batchelementen mit Lambda unter Verwendung von Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
SDK für Ruby
Anmerkung

Es gibt noch mehr GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern Kinesis Kinesis-Batch-Elementen mit Lambda mithilfe von Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'aws-sdk' def lambda_handler(event:, context:) batch_item_failures = [] event['Records'].each do |record| begin puts "Processed Kinesis Event - EventID: #{record['eventID']}" record_data = get_record_data_async(record['kinesis']) puts "Record Data: #{record_data}" # TODO: Do interesting work based on the new data rescue StandardError => err puts "An error occurred #{err}" # Since we are working with streams, we can return the failed item immediately. # Lambda will immediately begin to retry processing from this failed item onwards. return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] } end end puts "Successfully processed #{event['Records'].length} records." { batchItemFailures: batch_item_failures } end def get_record_data_async(payload) data = Base64.decode64(payload['data']).force_encoding('utf-8') # Placeholder for actual async work sleep(1) data end
Rust
SDK für Rust
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von Fehlern Kinesis Kinesis-Batch-Elementen mit Lambda mithilfe von Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::kinesis::KinesisEvent, kinesis::KinesisEventRecord, streams::{KinesisBatchItemFailure, KinesisEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> { let mut response = KinesisEventResponse { batch_item_failures: vec![], }; if event.payload.records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in &event.payload.records { tracing::info!( "EventId: {}", record.event_id.as_deref().unwrap_or_default() ); let record_processing_result = process_record(record); if record_processing_result.is_err() { response.batch_item_failures.push(KinesisBatchItemFailure { item_identifier: record.kinesis.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!( "Successfully processed {} records", event.payload.records.len() ); Ok(response) } fn process_record(record: &KinesisEventRecord) -> Result<(), Error> { let record_data = std::str::from_utf8(record.kinesis.data.as_slice()); if let Some(err) = record_data.err() { tracing::error!("Error: {}", err); return Err(Error::from(err)); } let record_data = record_data.unwrap_or_default(); // do something interesting with the data tracing::info!("Data: {}", record_data); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Konfigurationsparameter zu Amazon Kinesis

Alle Lambda-Ereignisquelltypen verwenden dieselben CreateEventSourceMappingUpdateEventSourceMappingAPI-Operationen. Allerdings gelten nur einige der Parameter für Kinesis.

Ereignisquellparameter, die für Kinesis gelten
Parameter Erforderlich Standard Hinweise

BatchSize

N

100

Höchstwert: 10 000.

BisectBatchOnFunctionError

N

false

DestinationConfig

N

Ein Ziel der Amazon-SQS-Standardwarteschlange oder des Amazon-SNS-Standardthemas für verworfene Datensätze

Aktiviert

N

true

EventSourceArn

Y

Der ARN des Datenstroms oder eines Stream-Konsumenten

FunctionName

Y

FunctionResponseTypes

N

Damit Ihre Funktion bestimmte Fehler in einem Batch meldet, beziehen Sie den Wert ReportBatchItemFailures in FunctionResponseTypes ein. Weitere Informationen finden Sie unter Melden von Batch-Elementen.

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 bedeutet unendlich: Lambda verwirft keine Datensätze (die Datenaufbewahrungseinstellungen von Kinesis Data Streams gelten weiterhin)

Minimum: -1

Höchstwert: 604 800

MaximumRetryAttempts

N

-1

-1 bedeutet unendlich: Fehlgeschlagene Datensätze werden wiederholt, bis der Datensatz abläuft

Minimum: -1

Höchstwert: 10 000.

ParallelizationFactor

N

1

Maximum: 10

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, oder LATEST

StartingPositionTimestamp

N

Nur gültig, wenn auf StartingPosition AT_TIMESTAMP gesetzt ist. Die Zeit, ab der mit dem Lesen begonnen werden soll, in Unix-Zeitsekunden

TumblingWindowInSeconds

N

Minimum: 0

Maximum: 900