Verwendung AWS Lambda mit Amazon DynamoDB - AWS Lambda

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwendung AWS Lambda mit Amazon DynamoDB

Anmerkung

Wenn Sie Daten an ein anderes Ziel als eine Lambda-Funktion senden oder die Daten vor dem Senden anreichern möchten, finden Sie weitere Informationen unter Amazon EventBridge Pipes.

Sie können eine AWS Lambda Funktion verwenden, um Datensätze in einem Amazon DynamoDB DynamoDB-Stream zu verarbeiten. Mit DynamoDB Streams können Sie eine Lambda-Funktion auslösen, damit jedes Mal, wenn eine DynamoDB-Tabelle aktualisiert wird, weitere Aufgaben ausgeführt werden.

Lambda liest Datensätze aus dem Stream und ruft Ihre Funktion synchron mit einem Ereignis auf, das Stream-Datensätze enthält. Lambda liest Datensätze in Batches und ruft Ihre Funktion auf, um Datensätze aus dem Batch zu verarbeiten.

Beispielereignis

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

Abrufen und Stapeln von Streams

Lambda fragt Shards in Ihrem DynamoDB-Stream nach Datensätzen bei einer Basisrate von viermal pro Sekunde ab. Sind Datensätze verfügbar, ruft Lambda Ihre Funktion auf und wartet auf das Ergebnis. Ist die Verarbeitung erfolgreich, setzt Lambda die Abrufe fort, bis es weitere Datensätze erhält.

Standardmäßig ruft Lambda Ihre Funktion auf, sobald Datensätze verfügbar sind. Wenn der Batch, den Lambda aus der Ereignisquelle liest, nur einen Datensatz enthält, sendet Lambda nur einen Datensatz an die Funktion. Damit die Funktion nicht mit einer kleinen Anzahl von Datensätzen aufgerufen wird, können Sie die Ereignisquelle anweisen, Datensätze bis zu 5 Minuten lang zu puffern, indem Sie ein Batch-Fenster konfigurieren. Bevor die Funktion aufgerufen wird, liest Lambda so lange Datensätze aus der Ereignisquelle, bis es einen vollständigen Batch erfasst hat, das Batch-Verarbeitungsfenster abläuft oder der Batch die Nutzlastgrenze von 6 MB erreicht. Weitere Informationen finden Sie unter Batching-Verhalten.

Warnung

Lambda-Ereignisquellenzuordnungen verarbeiten jedes Ereignis mindestens einmal, und es kann zu einer doppelten Verarbeitung von Datensätzen kommen. Um mögliche Probleme im Zusammenhang mit doppelten Ereignissen zu vermeiden, empfehlen wir Ihnen dringend, Ihren Funktionscode idempotent zu machen. Weitere Informationen finden Sie im Knowledge Center unter Wie mache ich meine Lambda-Funktion idempotent?. AWS

Konfigurieren Sie die ParallelizationFactorEinstellung so, dass ein Shard eines DynamoDB-Streams mit mehr als einem Lambda-Aufruf gleichzeitig verarbeitet wird. Sie können die Anzahl der gleichzeitigen Batches angeben, die Lambda von einem Shard über einen Parallelisierungsfaktor von 1 (Standard) bis 10 abfragt. Wenn Sie die Anzahl der gleichzeitigen Batches pro Shard erhöhen, sorgt Lambda trotzdem für die Verarbeitung in der Reihenfolge auf Artikelebene (Partition und Sortierschlüssel).

Startpositionen für Abfragen und Streams

Beachten Sie, dass die Stream-Abfrage bei der Erstellung und Aktualisierung der Zuordnung von Ereignisquellen letztendlich konsistent ist.

  • Bei der Erstellung der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis mit der Abfrage von Ereignissen aus dem Stream begonnen wird.

  • Bei Aktualisierungen der Zuordnung von Ereignisquellen kann es mehrere Minuten dauern, bis die Abfrage von Ereignissen aus dem Stream gestoppt und neu gestartet wird.

Dieses Verhalten bedeutet, dass, wenn Sie LATEST als Startposition für den Stream angeben, die Zuordnung von Ereignisquellen bei der Erstellung oder Aktualisierung möglicherweise Ereignisse übersieht. Um sicherzustellen, dass keine Ereignisse übersehen werden, geben Sie die Stream-Startposition als TRIM_HORIZON an.

Gleichzeitige Leser eines Shard in DynamoDB Streams

Für Einzelregionstabellen, bei denen es sich nicht um globale Tabellen handelt, können Sie bis zu zwei Lambda-Funktionen entwerfen, die gleichzeitig aus demselben DynamoDB-Streams-Shard lesen. Eine Überschreitung dieses Grenzwerts kann zu einer Anforderungsdrosselung führen. Für globale Tabellen empfehlen wir, die Anzahl der gleichzeitigen Funktionen auf eine zu beschränken, um eine Anforderungsdrosselung zu vermeiden.

Berechtigungen für die Ausführungsrolle

Die AWSLambdaDynamoDBExecutionRole AWS verwaltete Richtlinie umfasst die Berechtigungen, die Lambda benötigt, um aus Ihrem DynamoDB-Stream zu lesen. Fügen Sie diese verwaltete Richtlinie der Ausführungsrolle Ihrer Funktion hinzu.

Um Datensätze fehlgeschlagener Batches an eine SQS-Standardwarteschlange oder ein SNS-Standardthema zu senden, benötigt Ihre Funktion zusätzliche Berechtigungen. Jeder Zielservice benötigt eine andere Berechtigung wie folgt:

Fügen Sie Berechtigungen hinzu und erstellen Sie die Zuordnung der Ereignisquelle

Erstellen Sie eine Ereignisquellenzuordnung, um Lambda anzuweisen, Datensätze aus Ihrem Stream an eine Lambda-Funktion zu senden. Sie können mehrere Ereignisquellen-Zuweisung erstellen, um gleiche Daten mit mehreren Lambda-Funktionen oder Elemente aus mehreren Streams mit nur einer Funktion zu verarbeiten.

Um Ihre Funktion so zu konfigurieren, dass sie aus DynamoDB Streams liest, fügen Sie die AWSLambdaDynamoDBExecutionRole AWS verwaltete Richtlinie Ihrer Ausführungsrolle hinzu und erstellen Sie dann einen DynamoDB-Trigger.

Um Berechtigungen hinzuzufügen und einen Trigger zu erstellen
  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie den Namen einer Funktion aus.

  3. Wählen Sie die Registerkarte Konfiguration und dann Berechtigungen aus.

  4. Wählen Sie unter Rollenname den Link zu Ihrer Ausführungsrolle aus. Dieser Link öffnet die Rolle in der IAM-Konsole.

    Link zur Ausführungsrolle
  5. Wählen Sie Berechtigungen hinzufügen aus und wählen Sie dann Richtlinien direkt anhängen aus.

    Fügen Sie Richtlinien in der IAM-Konsole hinzu
  6. Geben Sie im Suchfeld AWSLambdaDynamoDBExecutionRole ein. Fügen Sie diese Richtlinie Ihrer Ausführungsrolle hinzu. Dies ist eine AWS verwaltete Richtlinie, die die Berechtigungen enthält, die Ihre Funktion zum Lesen aus dem DynamoDB-Stream benötigt. Weitere Informationen zu dieser Richtlinie finden Sie AWSLambdaDynamoDBExecutionRolein der Referenz zu AWS verwalteten Richtlinien.

  7. Kehren Sie zu Ihrer Funktion in der Lambda-Konsole zurück. Wählen Sie unter Function overview (Funktionsübersicht) die Option Add trigger (Trigger hinzufügen).

    Abschnitt „Funktionsübersicht“ der Lambda-Konsole
  8. Wählen Sie einen Auslösertyp aus.

  9. Konfigurieren Sie die erforderlichen Optionen und wählen Sie dann Add (Hinzufügen) aus.

Lambda unterstützt die folgenden Optionen für DynamoDB-Ereignisquellen:

Optionen für die Ereignisquelle
  • DynamoDB-Tabelle – Die DynamoDB-Tabelle, aus der Datensätze gelesen werden sollen.

  • Batchgröße – Die Anzahl der Datensätze, die in jedem Batch bis zu 10.000 an die Funktion gesendet werden sollen. Lambda übergibt alle Datensätze im Batch in einem einzigen Aufruf an die Funktion, solange die Gesamtgröße der Ereignisse nicht das Nutzlast-Limit für synchrone Aufrufe überschreitet (6 MB).

  • Batchfenster – Geben Sie die maximale Zeitspanne zur Erfassung von Datensätzen vor dem Aufruf der Funktion in Sekunden an.

  • Startposition – Verarbeiten Sie nur neue Datensätze oder alle vorhandenen Datensätze.

    • Neueste – Verarbeiten Sie Datensätze, die neu zum Stream hinzugefügt wurden.

    • Horizont trimmen – Verarbeiten Sie alle Datensätze im Stream.

    Nach der Verarbeitung aller vorhandenen Datensätze hat die Funktion aufgeholt und setzt die Verarbeitung neuer Datensätze fort.

  • On-failure destination (Ziel bei Ausfall) – Eine SQS-Warteschlange oder ein SNS-Thema für Datensätze, die nicht verarbeitet werden können. Wenn Lambda einen Datensatz-Batch verwirft, weil er zu alt ist oder alle Wiederholungen erschöpft hat, sendet es Details zum Batch an die Warteschlange oder das Thema.

  • Wiederholungsversuche – Die maximale Anzahl von Wiederholungen von Lambda, wenn die Funktion einen Fehler zurückgibt. Dies gilt nicht für Servicefehler oder Drosselungen, bei denen der Batch die Funktion nicht erreicht hat.

  • Höchstalter des Datensatzes – Das maximale Alter eines Datensatzes, den Lambda an Ihre Funktion sendet.

  • Batch bei Fehler aufteilen – Wenn die Funktion einen Fehler zurückgibt, teilen Sie den Batch vor dem erneuten Versuch in zwei Teile. Ihre ursprüngliche Einstellung für die Batch-Größe bleibt unverändert.

  • Gleichzeitige Batches pro Shard – Verarbeitet gleichzeitig mehrere Batches aus demselben Shard.

  • Aktiviert – Auf „true“ festlegen, um die Ereignisquellenzuordnung zu aktivieren. Auf "false" festlegen, um die Verarbeitung von Datensätzen zu beenden. Lambda merkt sich den zuletzt verarbeiteten Datensatz und setzt die Verarbeitung nach erneuter Aktivierung der Zuordnung an dieser Stelle fort.

Anmerkung

Für GetRecords API-Aufrufe, die von Lambda als Teil von DynamoDB-Triggern aufgerufen werden, fallen keine Gebühren an.

Um die Konfiguration der Ereignisquelle zu einem späteren Zeitpunkt zu verwalten, wählen Sie den Auslöser im Designer aus.

Fehlerbehandlung

Die Fehlerbehandlung für DynamoDB-Ereignisquellenzuordnungen hängt davon ab, ob der Fehler vor dem Aufruf der Funktion oder während des Funktionsaufrufs auftritt:

Wenn die Fehlerbehandlungsmaßnahmen fehlschlagen, verwirft Lambda die Datensätze und setzt die Verarbeitung von Batches aus dem Stream fort. Bei den Standardeinstellungen bedeutet dies, dass ein fehlerhafter Datensatz die Verarbeitung auf dem betroffenen Shard für bis zu einen Tag blockieren kann. Um dies zu vermeiden, konfigurieren Sie die Ereignisquellenzuordnung Ihrer Funktion mit einer angemessenen Anzahl von Wiederholungen und einem maximalen Datensatzalter, das zu Ihrem Anwendungsfall passt.

Konfiguration von Zielen für fehlgeschlagene Aufrufe

Um Datensätze zu fehlgeschlagenen Aufrufen zur Zuordnung von Ereignisquellen beizubehalten, fügen Sie der Zuordnung von Ereignisquellen Ihrer Funktion ein Ziel hinzu. Jeder an das Ziel gesendete Datensatz ist ein JSON-Dokument mit Metadaten über den fehlgeschlagenen Aufruf. Sie können jedes Amazon SNS SNS-Thema oder jede Amazon SQS SQS-Warteschlange als Ziel konfigurieren. Ihre Ausführungsrolle muss über Berechtigungen für das Ziel verfügen:

Gehen Sie folgendermaßen vor, um ein Ausfallziel mit der Konsole zu konfigurieren:

  1. Öffnen Sie die Seite Funktionen der Lambda-Konsole.

  2. Wählen Sie eine Funktion aus.

  3. Wählen Sie unter Function overview (Funktionsübersicht) die Option Add destination (Ziel hinzufügen).

  4. Wählen Sie als Quelle die Option Aufruf der Zuordnung von Ereignisquellen aus.

  5. Wählen Sie für die Zuordnung von Ereignisquellen eine Ereignisquelle aus, die für diese Funktion konfiguriert ist.

  6. Wählen Sie für Bedingung die Option Bei Ausfall aus. Für Aufrufe zur Zuordnung von Ereignisquellen ist dies die einzig akzeptierte Bedingung.

  7. Wählen Sie unter Zieltyp den Zieltyp aus, an den Lambda Aufrufdatensätze sendet.

  8. Wählen Sie unter Destination (Ziel) eine Ressource aus.

  9. Wählen Sie Save aus.

Sie können mit () auch ein Ziel für den Fall eines Fehlers konfigurieren. AWS Command Line Interface AWS CLI Mit dem folgenden Befehl create-event-source-mapping wird beispielsweise eine Ereignisquellenzuordnung mit einem SQS-Ziel für den Fall eines Fehlers hinzugefügt: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Der folgende Befehl update-event-source-mapping aktualisiert eine Ereignisquellenzuordnung, sodass fehlgeschlagene Aufrufdatensätze nach zwei Wiederholungsversuchen oder wenn die Datensätze älter als eine Stunde sind, an ein SNS-Ziel gesendet werden.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

Aktualisierte Einstellungen werden asynchron angewendet und werden erst nach Abschluss des Vorgangs in der Ausgabe berücksichtigt. Verwenden Sie den Befehl get-event-source-mapping, um den aktuellen Status anzuzeigen.

Um ein Ziel zu entfernen, geben Sie eine leere Zeichenfolge als Argument für den destination-config-Parameter an:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Das folgende Beispiel zeigt einen Aufrufdatensatz für einen DynamoDB-Stream.

Beispiel Aufrufdatensatz
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Sie können diese Informationen verwenden, um die betroffenen Datensätze aus dem Stream für die Fehlersuche abzurufen. Die tatsächlichen Datensätze sind nicht enthalten, daher müssen Sie diesen Datensatz verarbeiten und aus dem Stream abrufen, bevor sie ablaufen und verloren gehen.

CloudWatch Amazon-Metriken

Lambda gibt die IteratorAge-Metrik aus, wenn Ihre Funktion die Verarbeitung eines Batches von Datensätzen fertigstellt. Die Metrik gibt an, wie alt der letzte Datensatz im Batch bei Fertigstellung der Verarbeitung war. Wenn Ihre Funktion neue Ereignisse verarbeitet, können Sie mit dem Iterator-Alter die Latenz zwischen dem Zeitpunkt, zu dem ein Datensatz hinzugefügt wird und dem Zeitpunkt, zu dem die Funktion verarbeitet wird, schätzen.

Eine steigende Tendenz beim Iterator-Alter kann auf Probleme mit Ihrer Funktion hindeuten. Weitere Informationen finden Sie unter Arbeiten mit Lambda-Funktionsmetriken.

Zeitfenster

Lambda-Funktionen können kontinuierliche Stream-Verarbeitungsanwendungen ausführen. Ein Stream entspricht einer unbegrenzten Menge von Daten, die kontinuierlich durch Ihre Anwendung fließen. Um Informationen aus dieser sich ständig aktualisierenden Eingabe zu analysieren, können Sie die enthaltenen Datensätze mithilfe eines zeitlich definierten Fensters binden.

Rollierende Fenster sind unterschiedliche Zeitfenster, die sich in regelmäßigen Abständen öffnen und schließen. Standardmäßig sind Lambda-Aufrufe zustandslos – Sie können sie nicht für die Verarbeitung von Daten über mehrere kontinuierliche Aufrufe hinweg ohne eine externe Datenbank verwenden. Mit rollierenden Fenstern können Sie jedoch Ihren Status über Aufrufe hinweg beibehalten. Dieser Zustand enthält das Gesamtergebnis der Nachrichten, die zuvor für das aktuelle Fenster verarbeitet wurden. Ihr Zustand kann maximal 1 MB pro Shard betragen. Wenn er diese Größe überschreitet, wird Lambda das Fenster vorzeitig beenden.

Jeder Datensatz in einem Stream gehört zu einem bestimmten Fenster. Lambda verarbeitet jeden Datensatz mindestens einmal, garantiert jedoch nicht, dass jeder Datensatz nur einmal verarbeitet wird. In seltenen Fällen, etwa bei der Fehlerbehandlung, werden einige Datensätze möglicherweise mehrmals verarbeitet. Datensätze werden beim ersten Mal immer in der richtigen Reihenfolge verarbeitet. Wenn Datensätze mehr als einmal verarbeitet werden, werden sie nicht in der richtigen Reihenfolge verarbeitet.

Aggregation und Verarbeitung

Ihre benutzerverwaltete Funktion wird sowohl zur Aggregation als auch zur Verarbeitung der Endergebnisse dieser Aggregation aufgerufen. Lambda aggregiert alle im Fenster empfangenen Datensätze. Sie können diese Datensätze in mehreren Stapeln erhalten, jeweils als ein separater Aufruf. Jeder Aufruf erhält einen Zustand. Wenn Sie also rollierende Fenster verwenden, muss Ihre Lambda-Funktionsantwort eine state-Eigenschaft enthalten. Wenn die Antwort keine state-Eigenschaft enthält, betrachtet Lambda dies als fehlgeschlagenen Aufruf. Um diese Bedingung zu erfüllen, kann Ihre Funktion ein TimeWindowEventResponse-Objekt zurückgeben, das die folgende JSON-Form aufweist:

Beispiel TimeWindowEventResponse-Werte
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Anmerkung

Für Java-Funktionen empfehlen wir, eine Map<String, String> zu verwenden, um den Status darzustellen.

Am Ende des Fensters wird das Flag isFinalInvokeForWindow auf true gesetzt, um anzugeben, dass es sich um den Endzustand handelt und dass es für die Verarbeitung bereit ist. Nach der Verarbeitung werden das Fenster und Ihr endgültiger Aufruf wird abgeschlossen, und dann wird der Zustand gelöscht.

Am Ende Ihres Fensters verwendet Lambda die endgültige Verarbeitung für Aktionen an den Aggregationsergebnissen. Ihre endgültige Verarbeitung wird synchron aufgerufen. Nach erfolgreichem Aufruf zeigt Ihre Funktion auf die Sequenznummer und die Stream-Verarbeitung wird fortgesetzt. Wenn der Aufruf nicht erfolgreich ist, unterbricht Ihre Lambda-Funktion die weitere Verarbeitung bis zu einem erfolgreichen Aufruf.

Beispiel DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Konfiguration

Sie können rollierende Fenster konfigurieren, wenn Sie eine Ereignisquellenzuordnung erstellen oder aktualisieren. Um ein Taumelfenster zu konfigurieren, geben Sie das Fenster in Sekunden an (TumblingWindowInSeconds). Mit dem folgenden Beispielbefehl AWS Command Line Interface (AWS CLI) wird eine Quellenzuordnung für Streaming-Ereignisse erstellt, die ein Wechselfenster von 120 Sekunden hat. Die für Aggregation und Verarbeitung definierte Lambda-Funktion wird tumbling-window-example-function genannt.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda bestimmt die rollierenden Fenstergrenzen basierend auf dem Zeitpunkt, zu dem Datensätze in den Stream eingefügt wurden. Für alle Datensätze steht ein ungefährer Zeitstempel zur Verfügung, den Lambda in Grenzbestimmungen verwendet.

Rollierende Fensteraggregationen unterstützen kein Resharding. Wenn der Shard endet, betrachtet Lambda das Fenster als geschlossen und die untergeordneten Shards beginnen ihr eigenes Fenster in einem neuen Zustand.

Rollierende Fenster unterstützen vollständig die bestehenden Wiederholungsrichtlinien maxRetryAttempts und maxRecordAge.

Beispiel Handler.py – Aggregation und Verarbeitung

Die folgende Python-Funktion veranschaulicht, wie Sie Ihren Endzustand aggregieren und dann verarbeiten:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Melden von Batch-Elementen

Beim Konsumieren und Verarbeiten von Streaming-Daten aus einer Ereignisquelle werden standardmäßig Lambda-Checkpoints auf die höchste Sequenznummer eines Batches nur dann überprüft, wenn der Batch ein voller Erfolg ist. Lambda behandelt alle anderen Ergebnisse als einen vollständigen Fehler und versucht, den Batch bis zum Wiederholungslimit zu verarbeiten. Um beim Verarbeiten von Stapeln aus einem Stream Teilerfolge zu ermöglichen, aktivieren Sie ReportBatchItemFailures. Das Zulassen von Teilerfolgen kann dazu beitragen, die Anzahl der Wiederholungen in einer Aufzeichnung zu reduzieren, obwohl die Möglichkeit von Wiederholungen in einer erfolgreichen Aufzeichnung nicht vollständig verhindert wird.

Um die Option zu aktivierenReportBatchItemFailures, nehmen Sie den Enum-Wert ReportBatchItemFailures in die FunctionResponseTypenliste auf. Diese Liste zeigt an, welche Antworttypen für Ihre Funktion aktiviert sind. Sie können diese Liste konfigurieren, wenn Sie eine Ereignisquellenzuordnung erstellen oder aktualisieren.

Berichtsyntax

Beim Konfigurieren von Berichten zu Batch-Elementfehlern wird die StreamsEventResponse-Klasse mit einer Liste von Batch-Elementfehlern zurückgegeben. Sie können ein StreamsEventResponse-Objekt verwenden, um die Sequenznummer des ersten fehlgeschlagenen Datensatzes im Batch zurückzugeben. Sie können auch Ihre eigene benutzerdefinierte Klasse mit der richtigen Antwortsyntax erstellen. Die folgende JSON-Struktur zeigt die erforderliche Antwortsyntax:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
Anmerkung

Wenn das batchItemFailures-Array mehrere Elemente enthält, verwendet Lambda den Datensatz mit der niedrigsten Sequenznummer als Kontrollpunkt. Lambda wiederholt dann alle Datensätze ab diesem Kontrollpunkt.

Erfolgs- und Misserfolgsbedingungen

Lambda behandelt einen Batch als vollständigen Erfolg, wenn Sie eines der folgenden Elemente zurückgeben:

  • Eine leere batchItemFailure-Liste

  • Eine ungültige batchItemFailure-Liste

  • Ein leeres EventResponse

  • Ein ungültiges EventResponse

Lambda behandelt einen Batch als vollständigen Misserfolg, wenn Sie eines der folgenden Elemente zurückgeben:

  • Eine leere Zeichenfolge itemIdentifier

  • Ein ungültiges itemIdentifier

  • Ein itemIdentifier mit einem falschen Schlüsselnamen

Lambda wiederholt Fehler basierend auf Ihrer Wiederholungsstrategie.

Einen Batch halbieren

Wenn Ihr Aufruf fehlschlägt und BisectBatchOnFunctionError eingeschaltet ist, wird der Stapel unabhängig von Ihrer ReportBatchItemFailures-Einstellung halbiert.

Wenn eine partielle Batch-Erfolgsantwort empfangen wird und sowohl BisectBatchOnFunctionError als auch ReportBatchItemFailures aktiviert sind, wird der Batch mit der zurückgegebenen Sequenznummer halbiert und Lambda versucht nur die verbleibenden Datensätze erneut.

Hier sind einige Beispiele für Funktionscodes, die die Liste der fehlgeschlagenen Nachrichten-IDs im Batch zurückgeben:

.NET
AWS SDK for .NET
Anmerkung

Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda mithilfe von.NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
Go
SDK für Go V2
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda mithilfe von Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
Java
SDK für Java 2.x
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda mithilfe von Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
JavaScript
SDK für JavaScript (v3)
Anmerkung

Es gibt noch mehr dazu GitHub. Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von. JavaScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };

Melden von DynamoDB-Batchelementfehlern mit Lambda unter Verwendung von. TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { DynamoDBBatchItemFailure, DynamoDBStreamEvent } from "aws-lambda"; export const handler = async (event: DynamoDBStreamEvent): Promise<DynamoDBBatchItemFailure[]> => { const batchItemsFailures: DynamoDBBatchItemFailure[] = [] let curRecordSequenceNumber for(const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber if(curRecordSequenceNumber) { batchItemsFailures.push({ itemIdentifier: curRecordSequenceNumber }) } } return batchItemsFailures }
PHP
SDK für PHP
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda mithilfe von PHP.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $dynamoDbEvent = new DynamoDbEvent($event); $this->logger->info("Processing records"); $records = $dynamoDbEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK für Python (Boto3)
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda mithilfe von Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
SDK für Ruby
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda mithilfe von Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
Rust
SDK für Rust
Anmerkung

Es gibt noch mehr dazu. GitHub Das vollständige Beispiel sowie eine Anleitung zum Einrichten und Ausführen finden Sie im Repository mit Serverless-Beispielen.

Melden von DynamoDB-Batchelementfehlern mit Lambda mithilfe von Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Konfigurationsparameter zu Amazon DynamoDB Streams

Alle Lambda-Ereignisquelltypen verwenden dieselben CreateEventSourceMappingUpdateEventSourceMappingAPI-Operationen. Allerdings gelten nur einige der Parameter für DynamoDB Streams.

Ereignisquellparameter, die für DynamoDB Streams gelten
Parameter Erforderlich Standard Hinweise

BatchSize

N

100

Höchstwert: 10 000.

BisectBatchOnFunctionFehler

N

false

DestinationConfig

N

Ein Ziel der Amazon-SQS-Standardwarteschlange oder des Amazon-SNS-Standardthemas für verworfene Datensätze

Aktiviert

N

true

EventSourceArn

Y

Der ARN des Datenstroms oder eines Stream-Konsumenten

FilterCriteria

N

Lambda-Ereignisfilterung

FunctionName

Y

FunctionResponseTypen

N

Damit Ihre Funktion bestimmte Fehler in einem Batch meldet, beziehen Sie den Wert ReportBatchItemFailures in FunctionResponseTypes ein. Weitere Informationen finden Sie unter Melden von Batch-Elementen.

MaximumBatchingWindowInSekunden

N

0

MaximumRecordAgeInSekunden

N

-1

-1 bedeutet unendlich: Fehlgeschlagene Datensätze werden wiederholt, bis der Datensatz abläuft. Das Datenaufbewahrungslimit für DynamoDB Streams beträgt 24 Stunden.

Minimum: -1

Höchstwert: 604 800

MaximumRetryVersuche

N

-1

-1 bedeutet unendlich: Fehlgeschlagene Datensätze werden wiederholt, bis der Datensatz abläuft

Minimum: 0

Höchstwert: 10 000.

ParallelizationFactor

N

1

Maximum: 10

StartingPosition

Y

TRIM_HORIZON oder LATEST

TumblingWindowInSeconds

N

Minimum: 0

Maximum: 900