Uso de AWS Lambda con Amazon DynamoDB - AWS Lambda

Uso de AWS Lambda con Amazon DynamoDB

nota

Si desea enviar datos a un destino que no sea una función de Lambda o enriquecer los datos antes de enviarlos, consulte Amazon EventBridge Pipes (Canalizaciones de Amazon EventBridge).

Puede utilizar una función AWS Lambda para procesar los registros de un flujo de Amazon DynamoDB. Con DynamoDB Streams, puede activar una función de Lambda para realizar trabajo adicional cada vez que se actualice una tabla de DynamoDB.

Lambda lee los registros de la secuencia e invoca la función sincrónicamente con un evento que contiene registros de flujo. Lambda lee los registros por lotes e invoca la función para procesar los registros del lote.

Evento de ejemplo

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525", "eventSource": "aws:dynamodb" } ]}

Flujos de sondeo y procesamiento por lotes

Lambda sondea las particiones del DynamoDB Stream y busca registros 4 veces por segundo. Cuando hay registros disponibles, Lambda invoca la función y espera el resultado. Si el procesamiento se realiza correctamente, Lambda reanuda el sondeo hasta que recibe más registros.

De forma predeterminada, Lambda invoca su función tan pronto como los registros estén disponibles. Si el lote que Lambda lee del origen de eventos solo tiene un registro, Lambda envía solo un registro a la función. Para evitar invocar la función con un número de registros pequeño, puede indicar al origen de eventos que almacene en búfer registros durante hasta 5 minutos configurando un plazo de procesamiento por lotes. Antes de invocar la función, Lambda continúa leyendo los registros del origen de eventos hasta que haya recopilado un lote completo, venza el plazo de procesamiento por lotes o el lote alcance el límite de carga de 6 MB. Para obtener más información, consulte Comportamiento de procesamiento por lotes.

aviso

Las asignaciones de orígenes de eventos de Lambda procesan cada evento al menos una vez, y puede producirse un procesamiento duplicado de registros. Para evitar posibles problemas relacionados con la duplicación de eventos, le recomendamos encarecidamente que haga que el código de la función sea idempotente. Para obtener más información, consulte ¿Cómo puedo hacer que mi función de Lambda sea idempotente? en el Centro de conocimientos de AWS.

Ajuste la configuración de ParallelizationFactor para procesar una partición de una secuencia de DynamoDB con más de una invocación de Lambda simultáneamente. Puede especificar el número de lotes simultáneos que Lambda sondea desde una partición a través de un factor de paralelización de 1 (predeterminado) a 10. Si aumenta el número de lotes simultáneos por partición, Lambda sigue garantizando el procesamiento en orden del nivel del elemento (partición y clave de clasificación).

Posiciones iniciales de flujos y sondeo

Tenga en cuenta que el sondeo de flujos durante la creación y las actualizaciones de la asignación de orígenes de eventos es, en última instancia, coherente.

  • Durante la creación de la asignación de orígenes de eventos, es posible que se demore varios minutos en iniciar el sondeo de los eventos del flujo.

  • Durante las actualizaciones de la asignación de orígenes de eventos, es posible que se demore varios minutos en detener y reiniciar el sondeo de los eventos del flujo.

Este comportamiento significa que, si especifica LATEST como posición inicial del flujo, la asignación de orígenes de eventos podría omitir eventos durante la creación o las actualizaciones. Para garantizar que no se pierda ningún evento, especifique la posición inicial del flujo como TRIM_HORIZON.

Lectores simultáneos de una partición en DynamoDB Streams

En el caso de las tablas de una sola región que no sean tablas globales, puede diseñar hasta dos funciones de Lambda para leer desde la misma partición de DynamoDB Streams al mismo tiempo. Si excede este límite, puede producirse una limitación controlada de las solicitudes. En el caso de las tablas globales, le recomendamos que limite el número de funciones simultáneas a uno para evitar la limitación de solicitudes.

Permisos de rol de ejecución

La política administrada de AWS, AWSLambdaDynamoDBExecutionRole, contiene los permisos necesarios que Lambda necesita leer de su flujo de DynamoDB. Agregar esta política gestionada al rol de ejecución de la función.

Para enviar registros de lotes con errores a una cola de SQS estándar o un tema de SNS estándar, la función necesita permisos adicionales. Cada servicio de destino requiere un permiso diferente, como se indica a continuación:

Adición de permisos y creación de la asignación de orígenes de eventos

Cree una asignación de orígenes de eventos para indicar a Lambda que envíe registros desde un flujo a una función de Lambda. Puede crear varias asignaciones de orígenes de eventos para procesar los mismos datos con distintas funciones de Lambda o para procesar elementos de varios flujos con una sola función.

Para configurar la función para que lea desde el flujo de DynamoDB, adjunte la política administrada de AWS AWSLambdaDynamoDBExecutionRole al rol de ejecución y, a continuación, cree un desencadenador de DynamoDB.

Cómo agregar permisos y crear un desencadenador
  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija el nombre de una función.

  3. Elija la pestaña Configuración y, a continuación, elija Permisos.

  4. En Nombre del rol, elija el enlace al rol de ejecución. Este enlace abre el rol en la consola de IAM.

    Enlace al rol de ejecución
  5. Elija Agregar permisos y luego Adjuntar políticas.

    Políticas adjuntas a la consola de IAM
  6. En el campo de búsqueda, escriba AWSLambdaDynamoDBExecutionRole. Agregue esta política al rol de ejecución. Se trata de una política administrada por AWS que contiene los permisos que la función necesita para leer desde un flujo de DynamoDB. Para obtener más información acerca de esta política, consulte AWSLambdaDynamoDBExecutionRole en la Referencia de políticas administradas de AWS.

  7. Regrese a la función en la consola de Lambda. En Descripción general de la función, elija Agregar desencadenador.

    Sección de descripción general de la consola de Lambda
  8. Elija un tipo de desencadenador.

  9. Configure las opciones requeridas y luego elija Add (Agregar).

Lambda admite las siguientes opciones para los orígenes de eventos de DynamoDB:

Opciones de origen de eventos
  • Tabla DynamoDB: la tabla de DynamoDB de la que leer registros.

  • Tamaño del lote: número de registros que se enviarán a la función en cada lote, hasta 10 000. Lambda pasa todos los registros del lote a la función en una sola llamada, siempre y cuando el tamaño total de los eventos no exceda el límite de carga para la invocación síncrona (6 MB).

  • Ventana de lote: especifique la cantidad de tiempo máxima para recopilar registros antes de invocar la función, en segundos.

  • Posición inicial: procesar solo los registros nuevos o todos los registros existentes.

    • Más recientes: procesar los registros nuevos que se agreguen al flujo principal.

    • Horizonte de supresión: procesar todos los registros del flujo.

    Tras procesar cualquier registro existente, la función es alcanzada y continúa procesando registros nuevos.

  • Destino en caso de error: una cola de SQS estándar o un tema de SNS estándar para los registros que no se puedan procesar. Cuando Lambda descarta un lote de registros demasiado antiguo o que ha agotado todos los reintentos, Lambda envía detalles sobre el lote a la cola o al tema.

  • Número de reintentos: número máximo de reintentos que Lambda realiza cuando la función devuelve un error. Esto no se aplica a errores de servicio o limitaciones controladas en los que el lote no alcanzó la función.

  • Edad máxima de registro: antigüedad máxima de un registro que Lambda envía a su función.

  • División del lote en caso de error: cuando la función devuelve un error, divida el lote en dos antes de volver a intentarlo. La configuración de tamaño de lote original permanece sin cambios.

  • Lotes simultáneos por partición: procese simultáneamente varios lotes desde la misma partición.

  • Habilitado: establézcalo en verdadero para habilitar la asignación de orígenes de eventos. Establézcalo en falso para detener el procesamiento de registros. Lambda toma nota del último registro procesado y sigue procesando desde ese punto cuando se habilita de nuevo el mapeo.

nota

No se cobran las llamadas a la API GetRecords invocadas por Lambda como parte de los desencadenadores de DynamoDB.

Para administrar la configuración de origen de evento más tarde, elija el desencadenador en el diseñador.

Control de errores

La gestión de errores en las asignaciones de orígenes de eventos de DynamoDB depende de si el error se produce antes de que se invoque la función o durante la invocación de la función:

Si las medidas de administración de errores fallan, Lambda descarta los registros y continúa procesando lotes del flujo. Con la configuración predeterminada, esto significa que un registro incorrecto puede bloquear el procesamiento en la partición afectada durante un máximo de un día. Para evitar esto, configure el mapeo de fuente de eventos de su función con un número razonable de reintentos y una antigüedad máxima de registro que se ajuste a su caso de uso.

Configuración de destinos para invocaciones fallidas

Para retener los registros de las invocaciones de asignación de orígenes de eventos fallidos, agregue un destino a la asignación de orígenes de eventos de su función. Cada registro enviado al destino es un documento JSON con metadatos sobre la invocación fallida. Puede configurar cualquier tema de Amazon SNS o cualquier cola de Amazon SQS como destino. Su rol de ejecución debe tener permisos para el destino:

Para configurar un destino en caso de error mediante la consola, siga estos pasos:

  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija una función.

  3. En Descripción general de la función, elija Agregar destino.

  4. En Origen, elija Invocación de asignación de orígenes de eventos.

  5. Para la Asignación de orígenes de eventos, elija un origen de eventos que esté configurado para esta función.

  6. En Condición, seleccione En caso de error. Para las invocaciones de asignación de orígenes de eventos, esta es la única condición aceptada.

  7. En Tipo de destino, elija el tipo de destino al que Lambda envía los registros de invocación.

  8. En Destino, elija un recurso.

  9. Seleccione Guardar.

También puede configurar un destino en caso de error mediante la AWS Command Line Interface (AWS CLI). Por ejemplo, el siguiente comando create-event-source-mapping agrega una asignación de orígenes de eventos con un destino de SQS en caso de error a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

El siguiente comando update-event-source-mapping actualiza una asignación de orígenes de eventos para enviar registros de invocación fallida a un destino de SNS después de dos intentos de reintento, o si los registros tienen más de una hora de antigüedad.

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 \ --maximum-record-age-in-seconds 3600 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'

La configuración actualizada se aplica de forma asincrónica y no se refleja en la salida hasta que se completa el proceso. Utilice el comando get-event-source-mapping para ver el estado actual.

Para eliminar un destino, introduzca una cadena vacía como argumento del parámetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

El siguiente ejemplo muestra un registro de invocación para un flujo de DynamoDB Stream.

ejemplo Registro de invocación
{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Puede utilizar esta información para recuperar los registros afectados del flujo para solucionar problemas. Los registros reales no están incluidos, por lo que debe procesar este registro y recuperarlos del flujo antes de que caduquen y se pierdan.

Métricas de Amazon CloudWatch

Lambda emite la métrica IteratorAge cuando la función termina de procesar un lote de registros. La métrica indica la antigüedad del último registro del lote cuando acabo el proceso. Si la función está procesando nuevos eventos, puede utilizar la antigüedad del iterador para estimar la latencia entre cuando un registro se añade y cuando la función lo procesa.

Una tendencia ascendente en la antigüedad del iterador puede indicar problemas con la función. Para obtener más información, consulte Uso de métricas de funciones de Lambda.

Ventanas de tiempo

Las funciones de Lambda pueden ejecutar aplicaciones de procesamiento de flujo continuo. Una secuencia representa datos ilimitados que fluyen de forma continua a través de su aplicación. Para analizar la información de esta entrada de actualización continua, puede enlazar los registros incluidos mediante una ventana definida en términos de tiempo.

Las ventanas de salto constante son ventanas de tiempo distintas que se abren y cierran a intervalos regulares. De forma predeterminada, las invocaciones de Lambda no tienen estado: no se pueden utilizar para procesar datos en múltiples invocaciones continuas sin una base de datos externa. Sin embargo, con las ventanas de salto constante, puede mantener su estado en todas las invocaciones. Este estado contiene el resultado agregado de los mensajes procesados previamente para la ventana actual. Su estado puede ser un máximo de 1 MB por partición. Si supera ese tamaño, Lambda finaliza la ventana antes de tiempo.

Cada registro de una secuencia pertenece a un periodo específico. Lambda procesará cada registro al menos una vez, pero no garantiza que cada registro se procese solo una vez. En casos excepcionales, como el manejo de errores, es posible que algunos registros se procesen más de una vez. Los registros siempre se procesan en orden la primera vez. Si los registros se procesan más de una vez, es posible que lo hagan de forma desordenada.

Agregación y procesamiento

Su función administrada por el usuario se invoca tanto para la agregación como para procesar los resultados finales de esa agregación. Lambda agrega todos los registros recibidos en la ventana. Puede recibir estos registros en varios lotes, cada uno como una invocación independiente. Cada invocación recibe un estado. Por lo tanto, al usar las ventanas de salto constante, su respuesta de la función de Lambda debe contener una propiedad de state. Si la respuesta no contiene una propiedad de state, Lambda considera que esto es una invocación fallida. Para satisfacer esta condición, la función puede devolver un objeto de TimeWindowEventResponse, que tiene la siguiente forma JSON:

ejemplo Valores TimeWindowEventResponse
{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
nota

Para las funciones Java, se recomienda utilizar un Map<String, String> para representar el estado.

Al final de la ventana, el indicador isFinalInvokeForWindow está configurado en true para indicar que este es el estado final y que está listo para su procesamiento. Después del procesamiento, la ventana se completa y su invocación final se completa, y luego se elimina el estado.

Al final de la ventana, Lambda utiliza el procesamiento final para las acciones en los resultados de agregación. Su procesamiento final se invoca sincrónicamente. Después de la invocación exitosa, los puntos de control de la función, el número de secuencia y el procesamiento de flujo continúa. Si la invocación no tiene éxito, su función de Lambda suspende el procesamiento posterior hasta una invocación exitosa.

ejemplo DynamodbTimeWindowEvent
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuración

Puede configurar ventanas de salto constante al crear o actualizar una asignación de orígenes de eventos. Para configurar una ventana de saltos de tamaño constante, especifique la ventana en segundos (TumblingWindowInSeconds). El siguiente comando de ejemplo AWS Command Line Interface (AWS CLI) crea una asignación de origen de eventos de streaming que tiene una ventana de salto constante de 120 segundos. Se nombra la función de Lambda definida para la agregación y el procesamiento se llama tumbling-window-example-function.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2024-06-10T19:26:16.525 \ --function-name tumbling-window-example-function \ --starting-position TRIM_HORIZON \ --tumbling-window-in-seconds 120

Lambda determina los límites de la ventana de salto constante en función de la hora en que se insertaron los registros en la secuencia. Todos los registros tienen una marca de hora aproximada disponible que Lambda utiliza en las determinaciones de límites.

Las agregaciones de ventanas de saltos constantes no admiten el reendurecimiento. Cuando el fragmento termina, Lambda considera la ventana cerrada y las particiones secundarias comienzan su propia ventana en un estado fresco.

Ventanas de saltos constantes son totalmente compatibles con las directivas de reintento existentes maxRetryAttempts y maxRecordAge.

ejemplo Handler.py: agregación y procesamiento

La siguiente función de Python muestra cómo agregar y luego procesar su estado final:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Informes de fallos de elementos de lote

Al consumir y procesar datos de streaming desde un origen de eventos, de forma predeterminada, Lambda comprueba hasta el número de secuencia más alto de un lote solo cuando el lote se ha completado con éxito. Lambda trata todos los demás resultados como un error completo y vuelve a intentar procesar el lote hasta el límite de reintentos. Para permitir éxitos parciales al procesar lotes de una secuencia, active ReportBatchItemFailures. Permitir éxitos parciales puede ayudar a reducir el número de reintentos en un registro, aunque no impide por completo la posibilidad de reintentos en un registro exitoso.

Para activar ReportBatchItemFailures, incluya el valor enumerado ReportBatchItemFailures en la lista FunctionResponseTypes. Esta lista indica qué tipos de respuesta están habilitados para su función. Puede configurar esta lista al crear o actualizar una asignación de orígenes de eventos.

Sintaxis del informe

Al configurar los informes sobre errores de elementos por lotes, la clase StreamsEventResponse se devuelve con una lista de errores de elementos de lote. Puede utilizar un objeto StreamsEventResponse para devolver el número de secuencia del primer registro fallido del lote. También puede crear su propia clase personalizada usando la sintaxis de respuesta correcta. La siguiente estructura JSON muestra la sintaxis de respuesta requerida:

{ "batchItemFailures": [ { "itemIdentifier": "<SequenceNumber>" } ] }
nota

Si la matriz batchItemFailures contiene varios elementos, Lambda usa el registro con el número de secuencia más bajo como punto de control. Luego Lambda vuelve a probar todos los registros a partir de ese punto de control.

Condiciones de éxito y fracaso

Lambda trata un lote como un éxito completo si devuelve cualquiera de los siguientes elementos:

  • Una lista batchItemFailure vacía

  • Una lista batchItemFailure nula

  • Una EventResponse vacía

  • Un EventResponse nulo

Lambda trata un lote como un error completo si devuelve cualquiera de los siguientes elementos:

  • Una cadena itemIdentifier vacía

  • Una itemIdentifier nula

  • Un itemIdentifier con un mal nombre de clave

Lambda reintentos fallidos basados en su estrategia de reintento.

Bisecar un lote

Si su invocación falla y BisectBatchOnFunctionError está activada, el lote se divide en bisectos independientemente de su configuración ReportBatchItemFailures.

Cuando se recibe una respuesta de éxito parcial de lote y se activan tanto BisectBatchOnFunctionError como ReportBatchItemFailures, el lote se divide en el número de secuencia devuelto y Lambda vuelve a intentar solo los registros restantes.

Estos son algunos ejemplos de código de función que devuelven la lista de IDs de mensajes fallidos del lote:

.NET
AWS SDK for .NET
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public StreamsEventResponse FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new List<StreamsEventResponse.BatchItemFailure>(); StreamsEventResponse streamsEventResponse = new StreamsEventResponse(); foreach (var record in dynamoEvent.Records) { try { var sequenceNumber = record.Dynamodb.SequenceNumber; context.Logger.LogInformation(sequenceNumber); } catch (Exception ex) { context.Logger.LogError(ex.Message); batchItemFailures.Add(new StreamsEventResponse.BatchItemFailure() { ItemIdentifier = record.Dynamodb.SequenceNumber }); } } if (batchItemFailures.Count > 0) { streamsEventResponse.BatchItemFailures = batchItemFailures; } context.Logger.LogInformation("Stream processing complete."); return streamsEventResponse; } }
Go
SDK para Go V2
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type BatchItemFailure struct { ItemIdentifier string `json:"ItemIdentifier"` } type BatchResult struct { BatchItemFailures []BatchItemFailure `json:"BatchItemFailures"` } func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*BatchResult, error) { var batchItemFailures []BatchItemFailure curRecordSequenceNumber := "" for _, record := range event.Records { // Process your record curRecordSequenceNumber = record.Change.SequenceNumber } if curRecordSequenceNumber != "" { batchItemFailures = append(batchItemFailures, BatchItemFailure{ItemIdentifier: curRecordSequenceNumber}) } batchResult := BatchResult{ BatchItemFailures: batchItemFailures, } return &batchResult, nil } func main() { lambda.Start(HandleRequest) }
Java
SDK para Java 2.x
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Java.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse; import com.amazonaws.services.lambda.runtime.events.models.dynamodb.StreamRecord; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public StreamsEventResponse handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord : input.getRecords()) { try { //Process your record StreamRecord dynamodbRecord = dynamodbStreamRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(); } }
JavaScript
SDK para JavaScript (v3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event) => { const records = event.Records; let curRecordSequenceNumber = ""; for (const record of records) { try { // Process your record curRecordSequenceNumber = record.dynamodb.SequenceNumber; } catch (e) { // Return failed record's sequence number return { batchItemFailures: [{ itemIdentifier: curRecordSequenceNumber }] }; } } return { batchItemFailures: [] }; };

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante TypeScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { DynamoDBBatchResponse, DynamoDBBatchItemFailure, DynamoDBStreamEvent, } from "aws-lambda"; export const handler = async ( event: DynamoDBStreamEvent ): Promise<DynamoDBBatchResponse> => { const batchItemFailures: DynamoDBBatchItemFailure[] = []; let curRecordSequenceNumber; for (const record of event.Records) { curRecordSequenceNumber = record.dynamodb?.SequenceNumber; if (curRecordSequenceNumber) { batchItemFailures.push({ itemIdentifier: curRecordSequenceNumber, }); } } return { batchItemFailures: batchItemFailures }; };
PHP
SDK para PHP
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante PHP.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 <?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $dynamoDbEvent = new DynamoDbEvent($event); $this->logger->info("Processing records"); $records = $dynamoDbEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { $data = $record->getData(); $this->logger->info(json_encode($data)); // TODO: Do interesting work based on the new data } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getSequenceNumber(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords records"); // change format for the response $failures = array_map( fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK para Python (Boto3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Python.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}
Ruby
SDK para Ruby
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Ruby.

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event:, context:) records = event["Records"] cur_record_sequence_number = "" records.each do |record| begin # Process your record cur_record_sequence_number = record["dynamodb"]["SequenceNumber"] rescue StandardError => e # Return failed record's sequence number return {"batchItemFailures" => [{"itemIdentifier" => cur_record_sequence_number}]} end end {"batchItemFailures" => []} end
Rust
SDK para Rust
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Notificación de los errores de los elementos del lote de DynamoDB con Lambda mediante Rust.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::dynamodb::{Event, EventRecord, StreamRecord}, streams::{DynamoDbBatchItemFailure, DynamoDbEventResponse}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; /// Process the stream record fn process_record(record: &EventRecord) -> Result<(), Error> { let stream_record: &StreamRecord = &record.change; // process your stream record here... tracing::info!("Data: {:?}", stream_record); Ok(()) } /// Main Lambda handler here... async fn function_handler(event: LambdaEvent<Event>) -> Result<DynamoDbEventResponse, Error> { let mut response = DynamoDbEventResponse { batch_item_failures: vec![], }; let records = &event.payload.records; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(response); } for record in records { tracing::info!("EventId: {}", record.event_id); // Couldn't find a sequence number if record.change.sequence_number.is_none() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: Some("".to_string()), }); return Ok(response); } // Process your record here... if process_record(record).is_err() { response.batch_item_failures.push(DynamoDbBatchItemFailure { item_identifier: record.change.sequence_number.clone(), }); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return Ok(response); } } tracing::info!("Successfully processed {} record(s)", records.len()); Ok(response) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) // disable printing the name of the module in every log line. .with_target(false) // disabling time is handy because CloudWatch will add the ingestion time. .without_time() .init(); run(service_fn(function_handler)).await }

Parámetros de configuración de Amazon DynamoDB Streams

Todos los tipos de fuente de eventos Lambda comparten las mismas operaciones CreateEventSourceMapping y UpdateEventSourceMapping de la API. Sin embargo, solo algunos de los parámetros se aplican a Amazon DynamoDB Streams.

Parámetros de orígenes de eventos que se aplican a DynamoDB Streams
Parámetro Obligatoria Predeterminado Notas

BatchSize

N

100

Máximo: 10 000

BisectBatchOnFunctionError

N

false

DestinationConfig

N

Cola de Amazon SQS estándar o destino de tema de Amazon SNS estándar para registros descartados

Habilitado

N

true

EventSourceArn

Y

ARN del flujo de datos o un consumidor de flujos

FilterCriteria

N

Filtrado de eventos de Lambda

FunctionName

Y

FunctionResponseTypes

N

Para permitir que la función informe de errores específicos de un lote, incluya el valor ReportBatchItemFailures en FunctionResponseTypes. Para obtener más información, consulte Informes de fallos de elementos de lote.

MaximumBatchingWindowInSeconds

N

0

MaximumRecordAgeInSeconds

N

-1

-1 significa infinito: se vuelven a intentar los registros que han producido error hasta que caduque el registro. El límite de retención de datos del flujo de DynamoDB es de 24 horas.

Mínimo: -1

Máximo: 604 800

MaximumRetryAttempts

N

-1

-1 significa infinito: se vuelven a intentar los registros que han producido error hasta que caduque el registro

Mínimo: 0

Máximo: 10 000

ParallelizationFactor

N

1

Máximo: 10

StartingPosition

Y

TRIM_HORIZON o LATEST

TumblingWindowInSeconds

N

Mínimo: 0

Máximo: 900