Utilizzo AWS Lambda con Amazon DynamoDB - AWS Lambda

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo AWS Lambda con Amazon DynamoDB

Puoi utilizzare una AWS Lambda funzione per elaborare i record in un flusso Amazon DynamoDB. Con DynamoDB Streams, è possibile attivare una funzione Lambda per eseguire lavoro aggiuntivo ogni volta che una tabella DynamoDB viene aggiornata.

Lambda legge i record dal flusso e richiama la funzione in modo sincrono con un evento che contiene record di flusso. Lambda legge i record in batch e richiama la funzione per elaborare i record dal batch.

Esempio di evento

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

Flussi di polling e batching

Lambda esegue il polling delle partizioni presenti nel proprio flusso DynamoDB ricercando i record a una velocità di base di 4 volte al secondo. Quando sono disponibili dei record, Lambda invoca la funzione e attende il risultato. Se l'elaborazione ha esito positivo, Lambda riprende il polling fino a quando non riceve più record.

Per impostazione predefinita, Lambda richiama la funzione non appena i record sono disponibili. Se il batch che Lambda legge dall'origine eventi contiene un solo record, Lambda invia solo un record alla funzione. Per evitare di richiamare la funzione con pochi record è possibile, configurando un periodo di batch, chiedere all'origine eventi di memorizzare nel buffer i registri per un massimo di 5 minuti. Prima di richiamare la funzione, Lambda continua a leggere i registri dall'origine eventi fino a quando non ha raccolto un batch completo, fino alla scadenza del periodo di batch o fino a quando il batch non ha raggiunto il limite del payload di 6 MB. Per ulteriori informazioni, consulta Comportamento di batching.

avvertimento

Le mappature delle sorgenti degli eventi Lambda elaborano ogni evento almeno una volta e può verificarsi un'elaborazione duplicata dei record. Per evitare potenziali problemi legati agli eventi duplicati, ti consigliamo vivamente di rendere idempotente il codice della funzione. Per ulteriori informazioni, consulta Come posso rendere idempotente la mia funzione Lambda nel Knowledge Center. AWS

Configura l'ParallelizationFactorimpostazione per elaborare uno shard di un flusso DynamoDB con più di una chiamata Lambda contemporaneamente. È possibile specificare il numero di batch simultanei di cui Lambda esegue il polling da una partizione da un fattore di parallelizzazione compreso tra da 1 (predefinito) e 10. Quando si aumenta il numero di batch simultanei per shard, Lambda garantisce comunque l'elaborazione in ordine a livello di articolo (chiave di partizione e ordinamento).

Posizioni di partenza di polling e flussi

Tieni presente che il polling dei flussi durante la creazione e gli aggiornamenti dello strumento di mappatura dell’origine degli eventi alla fine è coerente.

  • Durante la creazione dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

  • Durante gli aggiornamenti dello strumento di mappatura dell'origine degli eventi, potrebbero essere necessari alcuni minuti per l'avvio degli eventi di polling dal flusso.

Questo comportamento implica che se specifichi LATEST come posizione iniziale del flusso, lo strumento di mappatura dell’origine degli eventi potrebbe perdere eventi durante la creazione o gli aggiornamenti. Per garantire che nessun evento venga perso, specifica la posizione di inizio del flusso come TRIM_HORIZON.

Lettori simultanei di una partizione in DynamoDB Streams

Per le tabelle di una regione singola che non sono tabelle globali, è possibile progettare contemporaneamente fino a due funzioni Lambda per leggere dalla stessa partizione di DynamoDB Streams nello stesso momento. Il superamento di questo limite comporta una limitazione delle richieste. Per le tabelle globali consigliamo di limitare il numero di lettori simultanei a uno per evitare richieste di limitazione della larghezza di banda della rete.

Autorizzazioni del ruolo di esecuzione

La policy AWSLambdaDynamoDBExecutionRole AWS gestita include le autorizzazioni che Lambda deve leggere dal tuo flusso DynamoDB. Aggiungi questa policy gestita al ruolo di esecuzione della tua funzione.

Per inviare i record dei batch non riusciti a una coda SQS standard o a un argomento SNS standard, la funzione necessita di autorizzazioni aggiuntive. Ogni servizio di destinazione richiede un'autorizzazione diversa, come segue:

Aggiungi le autorizzazioni e crea la mappatura delle sorgenti degli eventi

Crea una mappatura dell'origine eventi per indicare a Lambda di inviare i record dal proprio flusso a una funzione Lambda. È possibile creare più mappature dell'origine eventi per elaborare gli stessi dati con più funzioni Lambda o per elaborare elementi da più flussi con una singola funzione.

Per configurare la tua funzione per la lettura da DynamoDB Streams, collega la policy gestita AWSLambdaDynamoDBExecutionRole AWS al tuo ruolo di esecuzione e quindi crea un trigger DynamoDB.

Per aggiungere autorizzazioni e creare un trigger
  1. Aprire la pagina Functions (Funzioni) della console Lambda.

  2. Scegliere il nome della funzione.

  3. Quindi, seleziona la scheda Configuration (Configurazione) e poi Permissions (Autorizzazioni).

  4. In Nome del ruolo, scegli il link al tuo ruolo di esecuzione. Questo link apre il ruolo nella console IAM.

    Collegamento al ruolo di esecuzione
  5. Seleziona Aggiungi autorizzazioni, quindi seleziona Collega policy.

    Allega le politiche nella console IAM
  6. Inserisci AWSLambdaDynamoDBExecutionRole nel campo di ricerca. Aggiungi questa policy al tuo ruolo di esecuzione. Si tratta di una policy AWS gestita che contiene le autorizzazioni che la funzione deve leggere dal flusso DynamoDB. Per ulteriori informazioni su questa politica, consulta il AWS Managed Policy AWSLambdaDynamoDBExecutionRoleReference.

  7. Torna alla tua funzione nella console Lambda. In Panoramica delle funzioni, scegliere Aggiungi trigger.

    Sezione di panoramica delle funzioni della console Lambda
  8. Scegliere un tipo di trigger.

  9. Configurare le opzioni richieste, quindi scegliere Add (Aggiungi).

Lambda supporta le seguenti opzioni per le sorgenti di eventi DynamoDB:

Opzioni di origine eventi
  • DynamoDB table (Tabella DynamoDB): la tabella DynamoDB da cui leggere i record.

  • Batch size (Dimensione batch): il numero di record da inviare alla funzione in ogni batch, fino a 10.000. Lambda passa tutti i record del batch alla funzione in una singola chiamata, purché la dimensione totale degli eventi non superi il limite di payload per l'invocazione sincrona (6 MB).

  • Batch window (Periodo di batch): specifica il tempo massimo in secondi per la raccolta dei record prima di richiamare la funzione.

  • Starting position (Posizione iniziale): elabora solo i nuovi record o tutti i record esistenti.

    • Latest (Ultimi): consente di elaborare i nuovi record aggiunti al flusso.

    • Trim Horizon (Orizzonte di taglio): elabora tutti i record contenuti nel flusso.

    Dopo l'elaborazione di qualsiasi record esistente, la funzione è aggiornata e continua a elaborare nuovi record.

  • Destinazione in caso di errore: una coda SQS standard o un argomento SNS standard per i record che non è possibile elaborare. Quando Lambda scarta un batch di record perché è troppo datato o ha esaurito tutti i tentativi, invia i dettagli sul batch alla coda o all'argomento.

  • Retry attempts (Nuovi tentativi): il numero massimo di tentativi che Lambda effettua quando la funzione restituisce un errore. Non si applica agli errori di servizio o alle limitazioni in cui il batch non ha raggiunto la funzione.

  • Maximum age of record (Età massima del record): l'età massima di un record inviato da Lambda alla funzione.

  • Split batch on error (Dividi batch in caso di errore): quando la funzione restituisce un errore, divide il batch in due prima di un nuovo tentativo. L'impostazione originale delle dimensioni del batch rimane invariata.

  • Concurrent batches per shard (Batch simultanei per partizione): elabora più batch dalla stessa partizione simultaneamente.

  • Enabled (Abilitato): impostare su true per abilitare la mappatura dell'origine eventi. Impostare su false per interrompere l'elaborazione dei record. Lambda registra l'ultimo record elaborato e riprende l'elaborazione da quel punto quando la mappatura viene riabilitata.

Nota

Non ti vengono addebitati costi per le chiamate GetRecords API richiamate da Lambda come parte dei trigger di DynamoDB.

Per gestire la configurazione dell'origine eventi in un momento successivo, scegliere il trigger nel designer.

Gestione degli errori

La gestione degli errori per le mappature delle sorgenti degli eventi DynamoDB dipende dal fatto che l'errore si verifichi prima che la funzione venga richiamata o durante la chiamata della funzione:

Se le misure di gestione degli errori non riescono, Lambda elimina i record e continua l'elaborazione dei batch dal flusso. Con le impostazioni predefinite, ciò significa che un record errato può bloccare l'elaborazione sulla partizione interessata per un massimo di un giorno. Per evitare questa situazione, configura la mappatura dell'origine eventi della funzione con un numero ragionevole di tentativi e un'età massima dei record che sia adatta al caso d'uso.

Configurazione delle destinazioni per le chiamate non riuscite

Per mantenere i record delle chiamate non riuscite allo strumento di mappatura dell'origine degli eventi, aggiungi una destinazione allo strumento di mappatura dell'origine degli eventi della funzione. Ogni record inviato alla destinazione è un documento JSON con metadati sulla chiamata non riuscita. Puoi configurare qualsiasi argomento Amazon SNS o coda Amazon SQS come destinazione. Il tuo ruolo di esecuzione deve avere le autorizzazioni per la destinazione:

Per configurare una destinazione in caso di errore tramite la console, completa i seguenti passaggi:

  1. Aprire la pagina Funzioni della console Lambda.

  2. Scegliere una funzione.

  3. In Function overview (Panoramica delle funzioni), scegliere Add destination (Aggiungi destinazione).

  4. Per Origine, scegli Chiamata allo strumento di mappatura dell'origine degli eventi.

  5. Per Strumento di mappatura dell'origine degli eventi, scegli un'origine dell'evento configurata per questa funzione.

  6. Per Condizione, seleziona In caso di errore. Per le chiamate allo strumento di mappatura dell'origine degli eventi, questa è l'unica condizione accettata.

  7. Per Tipo di destinazione, scegli il tipo di destinazione a cui Lambda deve inviare i record di chiamata.

  8. Per Destination (Destinazione), scegliere una risorsa.

  9. Scegliere Save (Salva).

Puoi anche configurare una destinazione in caso di errore utilizzando (). AWS Command Line Interface AWS CLI Ad esempio, il seguente comando create-event-source-mapping aggiunge una mappatura dell'origine degli eventi con una destinazione SQS in caso di errore a: MyFunction

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

Il seguente comando update-event-source-mapping aggiorna una mappatura dell'origine degli eventi per inviare i record di chiamata non riuscita a una destinazione SNS dopo due tentativi o se i record risalgono a più di un'ora.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

Le impostazioni aggiornate sono applicate in modo asincrono e non sono riflesse nell'output fino al completamento del processo. Utilizzate il comando get-event-source-mapping per visualizzare lo stato corrente.

Per rimuovere una destinazione, fornisci una stringa vuota come argomento del parametro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

L'esempio seguente mostra un record di chiamata per un flusso DynamoDB.

Esempio Record di chiamata
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

È possibile utilizzare queste informazioni per recuperare i record interessati dal flusso per la risoluzione dei problemi. I record effettivi non sono inclusi, pertanto è necessario elaborare questo record e recuperarli dal flusso prima che scadano e vadano persi.

CloudWatch Metriche Amazon

Lambda emette il parametro IteratorAge quando la funzione termina l'elaborazione di un batch di record. Il parametro indica a quando risaliva l'ultimo record nel batch al momento del completamento dell'elaborazione. Se la funzione elabora nuovi eventi, è possibile utilizzare la cronologia di iterazione per stimare la latenza tra il momento in cui un record viene aggiunto e il momento in cui la funzione lo elabora.

Una tendenza in aumento nella cronologia di iterazione può indicare problemi con la funzione. Per ulteriori informazioni, consulta Utilizzo dei parametri delle funzioni Lambda.

Finestre temporali

Le funzioni Lambda possono eseguire applicazioni di elaborazione di flussi continui. Un flusso rappresenta dati illimitati che scorrono continuamente attraverso l'applicazione. Per analizzare le informazioni da questo input di aggiornamento continuo, è possibile associare i record inclusi utilizzando una finestra definita in termini di tempo.

Le finestre a cascata sono finestre temporali distinte che si aprono e si chiudono a intervalli regolari. Per impostazione predefinita, le invocazioni Lambda sono senza stato: non è possibile utilizzarle per l'elaborazione dei dati tra più invocazioni continue senza un database esterno. Tuttavia, con la finestra a cascata, è possibile mantenere il proprio stato tra le invocazioni. Questo stato contiene il risultato aggregato dei messaggi precedentemente elaborati per la finestra corrente. Lo stato può essere un massimo di 1 MB per partizione. Se supera quella dimensione, Lambda termina la finestra in anticipo.

Ogni record in un flusso appartiene a una finestra specifica. Lambda elaborerà ogni record almeno una volta, ma non garantisce che ogni record venga elaborato una sola volta. In rari casi, come nel caso della gestione degli errori, alcuni record potrebbero essere elaborati più di una volta. La prima volta i record vengono sempre elaborati in ordine. Se i record vengono elaborati più di una volta, possono essere elaborati fuori ordine.

Aggregazione ed elaborazione

La funzione gestita dall'utente viene richiamata sia per l'aggregazione che per l'elaborazione dei risultati finali di tale aggregazione. Lambda aggrega tutti i record ricevuti nella finestra. È possibile ricevere questi record in più batch, ciascuno come richiamo separato. Ogni richiamo riceve uno stato. Pertanto, quando si utilizzano finestre a cascata, la risposta della funzione Lambda deve contenere una proprietà di state. Se la risposta non contiene una proprietà di state, Lambda la considera un'invocazione non riuscita. Per soddisfare questa condizione, la funzione può restituire un oggetto TimeWindowEventResponse, che presenta la seguente forma JSON:

Esempio TimeWindowEventResponse valori
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
Nota

Per le funzioni Java, si consiglia di utilizzare una Map<String, String> per rappresentare lo stato.

Alla fine della finestra, il flag isFinalInvokeForWindow è impostato true per indicare che questo è lo stato finale e che è pronto per l'elaborazione. Dopo l'elaborazione, la finestra viene completata e il richiamo finale viene completato e quindi lo stato viene eliminato.

Al termine della finestra, Lambda utilizza l'elaborazione finale per le operazioni sui risultati dell'aggregazione. L'elaborazione finale viene richiamata in modo sincrono. Dopo il richiamo riuscito, la funzione controlla il numero di sequenza e l'elaborazione del flusso continua. Se il richiamo non ha esito positivo, la funzione Lambda sospende l'ulteriore elaborazione fino a quando non viene eseguito correttamente il richiamo.

Esempio DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configurazione

È possibile configurare le finestre a cascata quando si crea o si aggiorna un mapping di origini di eventi. Per configurare una finestra ribaltabile, specifica la finestra in secondi () TumblingWindowInSeconds. Il seguente comando di esempio AWS Command Line Interface (AWS CLI) crea una mappatura della sorgente degli eventi in streaming con una finestra di rotazione di 120 secondi. La funzione Lambda definita per l'aggregazione e l'elaborazione è denominata tumbling-window-example-function.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda determina i limiti delle finestre a cascata in base al momento in cui i record sono stati inseriti nel flusso. Tutti i record dispongono di un timestamp approssimativo che Lambda utilizza nelle determinazioni dei limiti.

Le aggregazioni delle finestre a cascata non supportano il risharding. Quando la partizione termina, Lambda considera la finestra chiusa e le partizioni secondarie iniziano la propria finestra in uno stato nuovo.

Le finestre a cascata supportano completamente le policy di ripetizione dei tentativi esistenti maxRetryAttempts e maxRecordAge.

Esempio Handler.py: aggregazione ed elaborazione

La seguente funzione Python mostra come aggregare e quindi elaborare lo stato finale:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Segnalazione errori articoli batch

Quando si consumano ed elaborano i dati di streaming da un'origine eventi, per impostazione predefinita i Lambda imposta i checkpoint al numero di sequenza più alto di un batch solo quando il batch è riuscito completamente. Lambda tratta tutti gli altri risultati come un fallimento completo e riprova a elaborare il batch fino al limite di tentativi. Per consentire i successi parziali durante l'elaborazione di batch da un flusso, attivare ReportBatchItemFailures. Consentire successi parziali può contribuire a ridurre il numero di tentativi su un record, anche se non impedisce del tutto la possibilità di tentativi in un record riuscito.

Per attivarloReportBatchItemFailures, includete il valore enum ReportBatchItemFailures nell'elenco dei FunctionResponse tipi. Questo elenco indica quali tipi di risposta sono abilitati per la funzione. È possibile configurare questo elenco quando si crea o si aggiorna una mappatura della sorgente degli eventi.

Sintassi di report

Quando si configura la creazione di report sugli errori degli elementi batch, la StreamsEventResponse classe viene restituita con un elenco di errori degli articoli batch. È possibile utilizzare un StreamsEventResponse oggetto per restituire il numero di sequenza del primo record non riuscito nel batch. È inoltre possibile creare la propria classe personalizzata utilizzando la sintassi di risposta corretta. La seguente struttura JSON mostra la sintassi di risposta richiesta:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
Nota

Se l'array batchItemFailures contiene più elementi, Lambda utilizza il record con il numero di sequenza più basso come checkpoint. Lambda quindi riprova tutti i record a partire da quel checkpoint.

Condizioni di successo e di errore

Lambda considera un batch come un successo completo se si restituisce uno degli elementi seguenti:

  • Una batchItemFailure lista vuota

  • Un batchItemFailure elenco nullo

  • Un vuoto EventResponse

  • Un valore nullo EventResponse

Lambda considera un batch come un fallimento completo se si restituisce uno degli elementi seguenti:

  • Una stringa vuota itemIdentifier

  • Un valore nullo itemIdentifier

  • Un itemIdentifier con un nome chiave errato

Lambda esegue nuovi tentativi in seguito ai fallimenti secondo la strategia di tentativi impostata.

Bisezione un batch

Se il richiamo fallisce ed BisectBatchOnFunctionError è attivato, il batch viene bisecato a prescindere dalle ReportBatchItemFailures impostazioni.

Quando si riceve una risposta di successo parziale del batch ed entrambi BisectBatchOnFunctionError e ReportBatchItemFailures sono attivati, il batch viene bisecato in corrispondenza del numero di sequenza restituito e Lambda ritenta solo i record rimanenti.

Di seguito sono riportati alcuni esempi di codice di funzione che restituiscono l'elenco degli ID dei messaggi con errori nel batch:

.NET
AWS SDK for .NET
Nota

C'è altro su. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB con Lambda utilizzando.NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
Go
SDK per Go V2
Nota

C'è altro da sapere. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB con Lambda utilizzando Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
Java
SDK per Java 2.x
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB con Lambda utilizzando Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
JavaScript
SDK per (v3) JavaScript
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB utilizzando Lambda. JavaScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };

Segnalazione degli errori degli elementi batch di DynamoDB utilizzando Lambda. TypeScript

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { DynamoDBBatchItemFailure, DynamoDBStreamEvent } from "aws-lambda"; export const handler = async (event: DynamoDBStreamEvent): Promise<DynamoDBBatchItemFailure[]> => { const batchItemsFailures: DynamoDBBatchItemFailure[] = [] let curRecordSequenceNumber for(const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber if(curRecordSequenceNumber) { batchItemsFailures.push({ itemIdentifier: curRecordSequenceNumber }) } } return batchItemsFailures }
PHP
SDK per PHP
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB con Lambda utilizzando PHP.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $dynamoDbEvent = new DynamoDbEvent($event); $this->logger->info("Processing records"); $records = $dynamoDbEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK per Python (Boto3)
Nota

C'è altro da fare. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB con Lambda utilizzando Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
SDK per Ruby
Nota

C'è altro da sapere. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB con Lambda utilizzando Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
Rust
SDK per Rust
Nota

C'è altro da sapere. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di DynamoDB con Lambda utilizzando Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Parametri di configurazione di Flussi Amazon DynamoDB

Tutti i tipi di sorgenti di eventi Lambda condividono le stesse operazioni CreateEventSourceMappinge quelle dell'UpdateEventSourceMappingAPI. Tuttavia, solo alcuni dei parametri si applicano a DynamoDB Streams.

Parametri di origine eventi che si applicano a DynamoDB Streams
Parametro Obbligatorio Predefinito Note

BatchSize

N

100

Massimo: 10.000.

BisectBatchOnFunctionErrore

N

false

DestinationConfig

N

Una destinazione della coda standard di Amazon SQS o di un argomento standard di Amazon SNS per i record scartati.

Abilitato

N

true

EventSourceArn

Y

L'ARN del flusso dei dati o di un consumer di flusso.

FilterCriteria

N

Filtro eventi Lambda

FunctionName

Y

FunctionResponseTipologie

N

Per consentire alla funzione di segnalare errori specifici in un batch, includi il valore ReportBatchItemFailures in FunctionResponseTypes. Per ulteriori informazioni, consulta Segnalazione errori articoli batch.

MaximumBatchingWindowInSecondi

N

0

MaximumRecordAgeInSecondi

N

-1

-1 significa infinito: i record non riusciti vengono ritentati fino alla scadenza del record. Il limite di conservazione dei dati per DynamoDB Streams è di 24 ore.

Minimo: -1

Massimo: 604.800

MaximumRetryTentativi

N

-1

-1 sta per infinito: i record non riusciti vengono ripetuti fino alla scadenza del record

Minimo: 0

Massimo: 10.000.

ParallelizationFactor

N

1

Maximum: 10

StartingPosition

Y

TRIM_HORIZON o LATEST

TumblingWindowInSeconds

N

Minimo: 0

Massimo: 900