Uso de AWS Lambda con Amazon DynamoDB - AWS Lambda

Uso de AWS Lambda con Amazon DynamoDB

Puede utilizar una función AWS Lambda para procesar los registros de un flujo de Amazon DynamoDB. Con DynamoDB Streams, puede activar una función de Lambda para realizar trabajo adicional cada vez que se actualice una tabla de DynamoDB.

Lambda lee los registros de la secuencia e invoca la función sincrónicamente con un evento que contiene registros de flujo. Lambda lee los registros por lotes e invoca la función para procesar los registros del lote.

ejemplo Evento de registro de DynamoDB Streams

{ "Records": [ { "eventID": "1", "eventVersion": "1.0", "dynamodb": { "Keys": { "Id": { "N": "101" } }, "NewImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES", "SequenceNumber": "111", "SizeBytes": 26 }, "awsRegion": "us-west-2", "eventName": "INSERT", "eventSourceARN": eventsourcearn, "eventSource": "aws:dynamodb" }, { "eventID": "2", "eventVersion": "1.0", "dynamodb": { "OldImage": { "Message": { "S": "New item!" }, "Id": { "N": "101" } }, "SequenceNumber": "222", "Keys": { "Id": { "N": "101" } }, "SizeBytes": 59, "NewImage": { "Message": { "S": "This item has changed" }, "Id": { "N": "101" } }, "StreamViewType": "NEW_AND_OLD_IMAGES" }, "awsRegion": "us-west-2", "eventName": "MODIFY", "eventSourceARN": sourcearn, "eventSource": "aws:dynamodb" }

Lambda sondea las particiones del DynamoDB Stream y busca registros 4 veces por segundo. Cuando hay registros disponibles, Lambda invoca la función y espera el resultado. Si el procesamiento se realiza correctamente, Lambda reanuda el sondeo hasta que recibe más registros.

De forma predeterminada, Lambda invoca su función tan pronto como los registros estén disponibles en el flujo. Si el lote que Lambda lee de la secuencia solo tiene un registro en él, Lambda envía solo un registro a la función. Para evitar invocar a la función con un número de registros pequeño, puede indicar al origen del evento que almacene en búfer registros hasta 5 minutos configurando una ventana de lote. Antes de invocar la función, Lambda sigue leyendo los registros del flujo hasta que haya recopilado un lote completo o hasta que caduque la ventana del lote.

Sin embargo, si la función devuelve un error, Lambda vuelve a intentar ejecutar el lote hasta que el procesamiento vaya satisfactoriamente o los datos caduquen. Para evitar particiones estancadas, puede configurar el mapeo de fuente de eventos para que lo vuelva a intentar con un tamaño de lote menor, limite el número de reintentos o descarte los registros que sean demasiado antiguos. Para conservar eventos descartados, puede configurar el mapeo de fuente de eventos para enviar detalles sobre los lotes fallidos a una cola de SQS o tema SNS.

También puede aumentar la concurrencia procesando varios lotes de cada partición en paralelo. Lambda puede procesar hasta 10 lotes en cada partición simultáneamente. Si aumenta el número de lotes simultáneos por partición, Lambda sigue garantizando el procesamiento en orden a nivel de clave de partición.

Configuración de la configuración de ParallelizationFactor para procesar una partición de un flujo de datos Kinesis o DynamoDB con más de una invocación Lambda simultáneamente. Puede especificar el número de lotes simultáneos que Lambda sondea desde una partición a través de un factor de paralelización de 1 (predeterminado) a 10. Por ejemplo, cuando ParallelizationFactor se establece en 2, puede tener 200 invocaciones Lambda simultáneas como máximo para procesar 100 particiones de datos de Kinesis. Esto ayuda a escalar el rendimiento de procesamiento cuando el volumen de datos es volátil y el IteratorAge es alto. Tenga en cuenta que el factor de paralelización no funcionará si está utilizando la agregación de Kinesis. Para obtener más información, consulte Nuevos controles de escalado de AWS Lambda para las fuentes de eventos de Kinesis y DynamoDB.

Permisos de rol de ejecución

Lambda necesita los siguientes permisos para administrar los recursos relacionados con DynamoDB Stream. Añada dichos permisos al rol de ejecución de su función.

La política administrada AWSLambdaDynamoDBExecutionRole contiene los permisos siguientes: Para obtener más información, consulte AWS LambdaRol de ejecución de .

Para enviar registros de lotes fallidos a una cola o tema, la función necesita permisos adicionales. Cada servicio de destino requiere un permiso diferente, como se indica a continuación:

Configuración de una secuencia como un origen de eventos

Cree un mapeo de fuente de eventos para indicar a Lambda que envíe registros desde un flujo a una función de Lambda. Puede crear varios mapeos de fuentes de eventos para procesar los mismos datos con distintas funciones de Lambda o para procesar elementos de varios flujos con una sola función.

Para configurar la función para que lea desde DynamoDB Streams en la consola de Lambda, cree un desencadenador de DynamoDB.

Para crear un disparador

  1. Abra la Functions page (Página de funciones) en la consola de Lambda.

  2. Elija una función.

  3. En Function overview (Descripción general de la función), elija Add trigger (Agregar disparador).

  4. Elija un tipo de desencadenador.

  5. Configure las opciones requeridas y luego elija Adde (Agregar).

Lambda admite las siguientes opciones para las fuentes de eventos de DynamoDB.

Opciones de origen de eventos

  • Tabla DynamoDB: la tabla de DynamoDB de la que leer registros.

  • Tamaño del lote: número de registros que se enviarán a la función en cada lote, hasta 10 000. Lambda pasa todos los registros del lote a la función en una sola llamada, siempre y cuando el tamaño total de los eventos no exceda el límite de carga para la invocación síncrona (6 MB).

  • Ventana de lote: especifique la cantidad de tiempo máxima para recopilar registros antes de invocar la función, en segundos.

  • Posición inicial: procesar solo los registros nuevos o todos los registros existentes.

    • Más recientes: procesar los registros nuevos que se agreguen al flujo principal.

    • Horizonte de supresión: procesar todos los registros del flujo.

    Tras procesar cualquier registro existente, la función es alcanzada y continúa procesando registros nuevos.

  • Destino en caso de fallo: una cola de SQS o un tema SNS para los registros que no se pueden procesar. Cuando Lambda descarta un lote de registros porque es demasiado antiguo o ha agotado todos los reintentos, envía detalles sobre el lote a la cola o tema.

  • Número de reintentos: número máximo de reintentos que Lambda realiza cuando la función devuelve un error. Esto no se aplica a errores de servicio o limitaciones controladas en los que el lote no alcanzó la función.

  • Edad máxima de registro: antigüedad máxima de un registro que Lambda envía a su función.

  • División del lote en caso de error: cuando la función devuelve un error, divida el lote en dos antes de volver a intentarlo.

  • Lotes simultáneos por partición: procese varios lotes desde la misma partición simultáneamente.

  • Habilitado: establecer en verdadero para habilitar el mapeo de fuente de eventos. Establecer en false para detener el procesamiento de registros. Lambda toma nota del último registro procesado y sigue procesando desde ese punto cuando se habilita de nuevo el mapeo.

nota

No se cobran las llamadas a la API GetRecords invocadas por Lambda como parte de los desencadenadores de DynamoDB.

Para administrar la configuración de origen de evento más tarde, elija el desencadenador en el diseñador.

API de mapeo de origen de eventos

Para administrar un oa fuente gen de eventos con la AWS CLI o AWS SDK, puede utilizar las siguientes operaciones de la API:

En el siguiente ejemplo se utiliza la AWS CLI para asignar una función llamada my-function a una cola de DynamoDB especificada mediante su nombre de recurso de Amazon (ARN), con un tamaño de lote de 500.

aws lambda create-event-source-mapping --function-name my-function --batch-size 500 --starting-position LATEST \ --event-source-arn arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525

Debería ver los siguientes datos de salida:

{ "UUID": "14e0db71-5d35-4eb5-b481-8945cf9d10c2", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1560209851.963, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Configure opciones adicionales para personalizar como la forma en que se procesan los lotes y especificar cuándo descartar los registros que no se pueden procesar. En el ejemplo siguiente se actualiza un mapeo de fuente de eventos para enviar un registro de error a una cola de SQS después de dos intentos de reintento, o si los registros tienen más de una hora de antigüedad.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

Debería ver esta salida:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

La configuración actualizada se aplica de forma asincrónica y no se refleja en la salida hasta que se completa el proceso. Utilice el comando get-event-source-mapping para ver el estado actual.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

Debería ver esta salida:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:dynamodb:us-east-2:123456789012:table/my-table/stream/2019-06-10T19:26:16.525", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Para procesar varios lotes de forma simultánea, utilice la opción --parallelization-factor.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Control de errores

El mapeo de fuente de eventos que lee los registros del flujo de DynamoDB Stream invoca la función de forma sincrónica y reintenta los errores. Si la función está restringida o el servicio Lambda devuelve un error sin invocar la función, Lambda vuelve a intentarlo hasta que los registros caduquen o superen la antigüedad máxima que configura en el mapeo de fuente del evento.

Si la función recibe los registros pero devuelve un error, Lambda vuelve a intentarlo hasta que los registros del lote caduquen, superen la antigüedad máxima o alcancen la cuota de reintento configurada. En el caso de errores de función, también puede configurar el mapeo de fuente de eventos para dividir un lote fallido en dos lotes. Reintentar con lotes más pequeños aísla los registros defectuosos y soluciona problemas de tiempo de espera. Para la cuota de reintento, no se tiene en cuenta la división de un lote.

Si las medidas de administración de errores fallan, Lambda descarta los registros y continúa procesando lotes del flujo. Con la configuración predeterminada, esto significa que un registro incorrecto puede bloquear el procesamiento en la partición afectada durante un máximo de un día. Para evitar esto, configure el mapeo de fuente de eventos de su función con un número razonable de reintentos y una antigüedad máxima de registro que se ajuste a su caso de uso.

Para conservar una copia de eventos descartados, configure un destino de eventos fallidos. Lambda envía un documento a la cola o al tema de destino con detalles sobre el lote.

Para configurar un destino para registros de eventos con errores

  1. Abra la Functions page (Página de funciones) en la consola de Lambda.

  2. Elija una función.

  3. En Function overview (Descripción general de la función), elija Add destination (Agregar destino).

  4. En Source (Origen), elija Stream invocation (Invocación por secuencias).

  5. En Stream (Secuencia), elija una secuencia que esté asignada a la función.

  6. En Destination type (Tipo de destino), elija el tipo de recurso que recibe el registro de invocación.

  7. En Destination (Destino), elija un recurso.

  8. Seleccione Save.

El siguiente ejemplo muestra un registro de invocación para un flujo de DynamoDB Stream.

ejemplo Registro de invocación

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:13:49.717Z", "DDBStreamBatchInfo": { "shardId": "shardId-00000001573689847184-864758bb", "startSequenceNumber": "800000000003126276362", "endSequenceNumber": "800000000003126276362", "approximateArrivalOfFirstRecord": "2019-11-14T00:13:19Z", "approximateArrivalOfLastRecord": "2019-11-14T00:13:19Z", "batchSize": 1, "streamArn": "arn:aws:dynamodb:us-east-2:123456789012:table/mytable/stream/2019-11-14T00:04:06.388" } }

Puede utilizar esta información para recuperar los registros afectados del flujo para solucionar problemas. Los registros reales no están incluidos, por lo que debe procesar este registro y recuperarlos del flujo antes de que caduquen y se pierdan.

Métricas de Amazon CloudWatch

Lambda emite la métrica IteratorAge cuando la función termina de procesar un lote de registros. La métrica indica la antigüedad del último registro del lote cuando acabo el proceso. Si la función está procesando nuevos eventos, puede utilizar la antigüedad del iterador para estimar la latencia entre cuando un registro se añade y cuando la función lo procesa.

Una tendencia ascendente en la antigüedad del iterador puede indicar problemas con la función. Para obtener más información, consulte Trabajar con métricas de funciones de AWS Lambda.

Ventanas de tiempo

Las funciones de Lambda pueden ejecutar aplicaciones de procesamiento de flujo continuo. Una secuencia representa un datos ilimitados que fluyen de forma continua a través de su aplicación. Para analizar la información de esta entrada de actualización continua, puede enlazar los registros incluidos mediante una ventana definida en términos de tiempo.

Las ventanas de salto constante son ventanas de tiempo distintas que se abren y cierran a intervalos regulares. De forma predeterminada, las invocaciones de Lambda no tienen estado: no se pueden utilizar para procesar datos en múltiples invocaciones continuas sin una base de datos externa. Sin embargo, con las ventanas de salto constante, puede mantener su estado en todas las invocaciones. Este estado contiene el resultado agregado de los mensajes procesados previamente para la ventana actual. Su estado puede ser un máximo de 1 MB por fragmento. Si supera ese tamaño, Lambda finaliza la ventana antes de tiempo.

Cada registro de una secuencia pertenece a una ventana específica. Un registro se procesa solo una vez, cuando Lambda procesa la ventana a la que pertenece el registro. En cada ventana, puede realizar cálculos, como una suma o un promedio, en el nivel de clave de partición dentro de un fragmento.

Agregación y procesamiento

Su función administrada por el usuario se invoca tanto para la agregación como para procesar los resultados finales de esa agregación. Lambda agrega todos los registros recibidos en la ventana. Puede recibir estos registros en varios lotes, cada uno como una invocación independiente. Cada invocación recibe un estado. Por lo tanto, al usar las ventanas de salto constante, su respuesta de la función de Lambda debe contener una propiedad de state. Si la respuesta no contiene una propiedad de state, Lambda considera que esto es una invocación fallida. Para satisfacer esta condición, la función puede devolver un objeto de TimeWindowEventResponse, que tiene la siguiente forma JSON:

ejemplo TimeWindowEventResponseValores

{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
nota

Para las funciones Java, se recomienda utilizar un Map<String, String> para representar el estado.

Al final de la ventana, el indicador isFinalInvokeForWindow está configurado en true para indicar que este es el estado final y que está listo para su procesamiento. Después del procesamiento, la ventana se completa y su invocación final se completa, y luego se elimina el estado.

Al final de la ventana, Lambda utiliza el procesamiento final para las acciones en los resultados de agregación. Su procesamiento final se invoca sincrónicamente. Después de la invocación exitosa, los puntos de control de la función el número de secuencia y el procesamiento de flujo continúa. Si la invocación no tiene éxito, su función Lambda suspende el procesamiento posterior hasta una invocación exitosa.

ejemplo DynamodbTimeWindowEvent

{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ], "window": { "start": "2020-07-30T17:00:00Z", "end": "2020-07-30T17:05:00Z" }, "state": { "1": "state1" }, "shardId": "shard123456789", "eventSourceARN": "stream-ARN", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuration

Puede configurar ventanas de salto constante al crear o actualizar una asignación de origen de eventos. Para configurar una ventana de salto constante, especifique la ventana en segundos. El siguiente comando de ejemplo AWS Command Line Interface (AWS CLI) crea una asignación de origen de eventos de transmisión que tiene una ventana de salto constante de 120 segundos. Se nombra la función de Lambda definida para la agregación y el procesamiento se llama tumbling-window-example-function.

aws lambda create-event-source-mapping --event-source-arn arn:aws:dynamodb:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda determina los límites de la ventana de salto constante en función de la hora en que se insertaron los registros en la secuencia. Todos los registros tienen una marca de hora aproximada disponible que Lambda utiliza en las determinaciones de límites.

Las agregaciones de ventanas de saltos constantes no admiten el reendurecimiento. Cuando el fragmento termina, Lambda considera la ventana cerrada y las particiones secundarios comienzan su propia ventana en un estado fresco.

Ventanas de saltos constantes son totalmente compatibles con las directivas de reintento existentes maxRetryAttempts y maxRecordAge.

ejemplo Handler.py: agregación y procesamiento

La siguiente función de Python muestra cómo agregar y luego procesar su estado final:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['dynamodb']['NewImage']['Id']] = state.get(record['dynamodb']['NewImage']['Id'], 0) + 1 print('Returning state: ', state) return {'state': state}

Informes de fallos de artículos de lote

Al consumir y procesar datos de streaming desde una fuente de eventos, de forma predeterminada los puntos de control de Lambda hasta el número de secuencia más alto de un lote solo cuando el lote es un éxito completo. Lambda trata todos los demás resultados como un error completo y vuelve a intentar procesar el lote hasta el límite de reintentos. Para permitir éxitos parciales al procesar lotes de una secuencia, active ReportBatchItemFailures. Permitir éxitos parciales puede ayudar a reducir el número de reintentos en un registro, aunque no impide por completo la posibilidad de reintentos en un registro exitoso.

Para activar ReportBatchItemFailures, incluya el valor enumerado ReportBatchItemFailures en la lista FunctionResponseTypes. Esta lista indica qué tipos de respuesta están habilitados para su función. Puede configurar esta lista al crear o actualizar una asignación de origen de eventos.

Sintaxis del informe

Al configurar los informes sobre errores de elementos por lotes, la clase StreamsEventResponse se devuelve con una lista de errores de elementos de lote. Puede utilizar un objeto StreamsEventResponse para devolver el número de secuencia del primer registro fallido del lote. También puede crear su propia clase personalizada usando la sintaxis de respuesta correcta. La siguiente estructura JSON muestra la sintaxis de respuesta requerida:

{ "batchItemFailures": [ { "itemIdentifier": "<id>" } ] }

Condiciones de éxito y fracaso

Lambda trata un lote como un éxito completo si devuelve cualquiera de los siguientes elementos:

  • Una lista batchItemFailure vacía

  • Una lista batchItemFailure nula

  • Un vací EventResponse

  • Un nul EventResponse

Lambda trata un lote como un error completo si devuelve cualquiera de los siguientes elementos:

  • Una cadena vací itemIdentifier

  • Un nul itemIdentifier

  • Un itemIdentifier con un mal nombre de clave

Lambda reintentos fallidos basados en su estrategia de reintento.

Bisecar un lote

Si su invocación falla y BisectBatchOnFunctionError está activada, el lote se divide en bisectos independientemente de su configuración ReportBatchItemFailures.

Cuando se recibe una respuesta de éxito parcial de lote y se activan tanto BisectBatchOnFunctionError como ReportBatchItemFailures, el lote se divide en el número de secuencia devuelto y Lambda vuelve a intentar solo los registros restantes.

Java

ejemplo Handler.java: devuelve el nuevo StreamsEventResponse()

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessDynamodbRecords implements RequestHandler<DynamodbEvent, Serializable> { @Override public Serializable handleRequest(DynamodbEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<*>(); String curRecordSequenceNumber = ""; for (DynamodbEvent.DynamodbEventRecord dynamodbEventRecord : input.getRecords()) { try { //Process your record DynamodbEvent.Record dynamodbRecord = dynamodbEventRecord.getDynamodb(); curRecordSequenceNumber = dynamodbRecord.getSequenceNumber(); } catch (Exception e) { //Return failed record's sequence number batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
Python

ejemplo Handler.py: devuelve BatchItemFailures[]

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = ""; for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}