Uso de AWS Lambda con Amazon Kinesis - AWS Lambda

Uso de AWS Lambda con Amazon Kinesis

Puede utilizar una función de AWS Lambda para procesar los registros de un flujo de datos de Amazon Kinesis.

Un flujo de datos de Kinesis es un conjunto de particiones. Cada partición contiene una secuencia de registros de datos. Un consumidor es una aplicación que procesa los datos procedentes de un flujo de datos de Kinesis. Puede asignar una función Lambda a un consumidor de rendimiento compartido (iterador estándar) o a un consumidor de rendimiento dedicado con distribución ramificada mejorada.

Para iteradores estándar, Lambda sondea cada partición de la secuencia de Kinesis en busca de registros utilizando el protocolo HTTP. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.

Para minimizar la latencia y maximizar el rendimiento de lectura, puede crear un consumidor de flujo de datos con distribución ramificada mejorada. Los consumidores de flujos obtienen una conexión dedicada a cada partición que no afecta a las demás aplicaciones que leen el flujo. El rendimiento dedicado puede ser útil si hay muchas aplicaciones que leen los mismos datos, o si se está reprocesando un flujo con registros de gran tamaño. Kinesis inserta los registros en Lambda a través de HTTP/2.

Para obtener información detallada sobre los flujos de datos de Kinesis, consulte Lectura de datos de Amazon Kinesis Data Streams.

Lambda lee registros del flujo de datos e invoca la función sincrónicamente con un evento que contenga registros de flujo. Lambda lee registros en lotes e invoca la función para procesar registros desde el lote.

ejemplo Evento de registro de Kinesis

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

De forma predeterminada, Lambda llama a su función en cuanto los registros están disponibles en el flujo. Si el lote que Lambdalee desde la secuencia solo tiene un registro en él, Lambda envía solo un registro a la función. Para evitar invocar a la función con un número de registros pequeño, puede indicar al origen del evento que almacene en búfer registros hasta 5 minutos configurando una ventana de lote. Antes de invocar a la función, Lambda sigue leyendo registros desde el flujo hasta que haya recopilado un lote completo o hasta que caduca la ventana de lote.

Si la función devuelve un error, Lambda volverá a intentar ejecutar el lote hasta que el procesamiento se realice correctamente o los datos caduquen. Para evitar particiones detenidas, puede configurar la asignación del origen de eventos para que vuelva a intentarlo con un tamaño de lote menor, para que limite el número de reintentos o para que se descarten los registros que sean muy antiguos. Si desea conservar los eventos descartados, puede configurar la asignación del origen de eventos para que envíe información sobre los lotes con errores a una cola de SQS o un tema de SNS.

También puede aumentar la simultaneidad procesando varios lotes de cada partición en paralelo. Lambda puede procesar hasta 10 lotes en cada partición simultáneamente. Si aumenta el número de lotes simultáneos en cada partición, Lambda seguirá garantizando que el procesamiento se realiza en orden en el nivel de la clave de partición.

Configure las opciones de ParallelizationFactor para procesar una partición de una secuencia de datos de Kinesis o DynamoDB con más de una invocación Lambda simultáneamente. Se puede especificar el número de lotes simultáneos que Lambda examina en una partición a través de un factor de paralelización que oscila entre 1 (predeterminado) y 10. Por ejemplo, cuando ParallelizationFactor se establece en 2, puede tener 200 invocaciones de Lambda simultáneas como máximo para procesar 100 particiones de datos de Kinesis. Esto ayuda a ampliar el rendimiento de procesamiento cuando el volumen de datos es volátil y el valor de IteratorAge es alto. Para obtener más información, consulte Nuevos controles de escalado de AWS Lambda para orígenes de eventos de Kinesis y DynamoDB.

Configurar su flujo de datos y función.

Su función de Lambda es una aplicación consumidora para su flujo de datos. Procesa un lote de registros a la vez desde cada partición. Puede mapear una función de Lambda a un flujo de datos (iterador estándar) o a un consumidor de un flujo (distribución ramificada mejorada).

Para los iteradores estándar, Lambda sondea cada partición del flujo de Kinesis y busca registros una vez por segundo. Cuando hay más registros disponibles, Lambda sigue procesando lotes hasta que la función se pone al día con el flujo. El mapeo de origen de eventos comparte el rendimiento de lectura con otros consumidores de la partición.

Para minimizar la latencia y maximizar el rendimiento de lectura, cree un consumidor de flujo de datos con distribución ramificada mejorada. Los consumidores con distribución ramificada mejorada obtienen una conexión dedicada a cada partición que no afecta a las demás aplicaciones que leen el flujo. Los consumidores de flujos utilizan HTTP/2 para reducir la latencia enviando los registros a Lambda a través de una conexión de larga duración y mediante la compresión de los encabezados de las solicitudes. Es posible crear un consumidor de flujos con la API RegisterStreamConsumer de Kinesis.

aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Debería ver los siguientes datos de salida:

{ "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

Para incrementar la velocidad con la que la función procesa los registros, añada particiones al flujo de datos. Lambda procesa los registros de cada partición por orden. Deja de procesar registros adicionales en una partición si la función devuelve un error. Al haber más particiones, se procesan más lotes simultáneamente, lo que reduce el impacto de los errores de simultaneidad.

Si la función no puede aumentar para administrar el número total de lotes simultáneos, solicite un aumento de cuota o reserve la simultaneidad para la función.

Permisos de rol de ejecución

Lambda necesita los siguientes permisos para administrar los recursos relacionados con el flujo de datos de Kinesis. Añádalos a rol de ejecución de su función.

La política administrada AWSLambdaKinesisExecutionRole contiene los permisos siguientes: Para obtener más información, consulte Rol de ejecución de AWS Lambda.

Para enviar registros de un lote con errores a una cola o un tema, la función necesita permisos adicionales. Cada servicio de destino requiere un permiso diferente, como se indica a continuación:

Configuración de una secuencia como un origen de eventos

Cree un mapeo de origen de eventos para indicar a Lambda que envíe registros de un flujo de datos a una función de Lambda. Puede crear varios mapeos de orígenes de eventos para procesar los mismos datos con distintas funciones de Lambda o para procesar elementos de varios flujos de datos con una sola función.

Para configurar la función para leer de Kinesis en la consola de Lambda, cree un desencadenador de Kinesis.

Para crear un disparador

  1. Abra la Página de funciones en la consola de Lambda.

  2. Elija una función.

  3. En Function overview (Descripción general de la función), elija Add trigger (Agregar disparador).

  4. Elija un tipo de disparador.

  5. Configure las opciones necesarias y, a continuación, elija Add (Añadir).

Lambda admite las siguientes opciones para los orígenes de eventos de Kinesis.

Opciones de origen de eventos

  • Kinesis stream (Flujo de Kinesis): el flujo de – Kinesis cuyos registros se leen.

  • Consumer (Consumidor)– (opcional): utilizar un consumidor de flujos para leer los datos del flujo a través de una conexión dedicada.

  • Batch size (Tamaño del lote): número de registros que hay que enviar a la función en cada lote, hasta 10 000. Lambda pasa todos los registros del lote a la función en una sola llamada, siempre que el tamaño total de los eventos no supere el límite de carga para la invocación asíncrona (6 MB).

  • Batch window (Ventana de lote): especifique la cantidad de tiempo máxima para recopilar registros antes de invocar la función, en segundos.

  • Starting position (Posición inicial): procesar solo los registros nuevos, todos los registros existentes o los registros creados después de cierta fecha.

    • Latest (Más recientes): procesar los registros nuevos que se añaden al flujo principal.

    • Trim horizon (Horizonte de supresión): procesar todos los registros del flujo.

    • At timestamp (En marca temporal): procesar los registros comenzando a partir de un momento determinado.

    Tras procesar cualquier registro existente, la función es alcanzada y continúa procesando registros nuevos.

  • On-failure destination (Destino en caso de error): cola de SQS o tema de SNS de los registros que no pueden procesarse. Cuando Lambda descarta un lote de registros porque es demasiado antiguo o se han agotado todos los reintentos, envía información sobre el lote a la cola o al tema.

  • Retry attempts (Número de reintentos): número máximo de reintentos que Lambda realiza cuando la función devuelve un error. No se aplica a los errores ni las limitaciones de los servicios en los que el lote no alcanza la función.

  • Maximum age of record (Antigüedad máxima del registro): antigüedad máxima de un registro que Lambda envía a la función.

  • Split batch on error (Dividir lote en caso de error): cuando la función devuelve un error, el lote se divide en dos antes de intentarlo de nuevo.

  • Process multiple batches from the same shard concurrently (Lotes simultáneos por partición): procesa simultáneamente varios lotes de la misma partición.

  • Enabled (Habilitado): establecer en true para habilitar la asignación de origen de eventos. Establecer en false el origen de eventos para dejar de procesar registros. Lambda toma nota del último registro procesado y sigue procesando desde ese punto cuando se habilita de nuevo.

nota

Kinesis cobra por cada partición y, para una distribución ramificada mejorada, por los datos leídos desde la frecuencia. Para obtener más información sobre precios, consulte precios de Amazon Kinesis.

Para administrar la configuración de origen de evento más tarde, elija el desencadenador en el diseñador.

API de mapeo de origen de eventos

Para administrar un origen de eventos con la AWS CLI o el SDK de AWS, puede utilizar las siguientes operaciones de la API:

Para crear el mapeo de orígenes de eventos con la AWS CLI, use el comando create-event-source-mapping. En el siguiente ejemplo, se utiliza la AWS CLI para asignar una función denominada my-function a un flujo de datos de Kinesis. El flujo de datos se especifica mediante un nombre de recurso de Amazon (ARN) con un tamaño de lote de 500, comenzando desde una marca temporal en tiempo Unix.

aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream

Debería ver los siguientes datos de salida:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Para utilizar un consumidor, especifique el ARN del consumidor en lugar del ARN del flujo.

Configure otras opciones para personalizar el modo en que se procesan los lotes y especificar cuándo deben descargarse los registros que no se pueden procesar. En el siguiente ejemplo, se actualiza la asignación de un origen de eventos para enviar un registro de error a una cola de SQS después de dos reintentos o si los registros tienen más de una hora de antigüedad.

aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}'

Debería consultar esta salida:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

La configuración actualizada se aplica de forma asincrónica y no se refleja en la salida hasta que se completa el proceso. Utilice el comando get-event-source-mapping para ver el estado actual.

aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b

Debería consultar esta salida:

{ "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Para procesar varios lotes simultáneamente, utilice la opción --parallelization-factor.

aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Control de errores

La asignación del origen de eventos que lee los registros de la secuencia de Kinesis invoca la función de forma sincrónica y reintenta la operación si se producen errores. Si la función se ve limitada o el servicio de Lambda devuelve un error sin invocar la función, Lambda vuelve a intentarlo hasta que el registro expira o supera la antigüedad máxima establecida en la asignación del origen de eventos.

Si la función recibe los registros pero devuelve un error, Lambda lo vuelve a intentar hasta que vencen los registros del lote, se supera la antigüedad máxima o se alcanza la cuota de reintentos especificada. En el caso de los errores de funciones, también puede configurar la asignación del origen de eventos para que divida en dos los lotes que presentan errores. Al intentarlo con lotes más pequeños, se aíslan los registros erróneos y se solucionan problemas relacionadas con el tiempo de espera. Los lotes divididos no se tienen en cuenta para calcular la cuota de reintentos.

Si las medidas de administración de errores no funcionan, Lambda descarta los registros y sigue procesando los lotes de la secuencia. Con la configuración predeterminada, esto significa que un registro erróneo puede bloquear el procesamiento de la partición afectada durante un máximo de one week. Para evitar esto, configure la asignación del origen de eventos de la función con un número razonable de reintentos y una antigüedad de registro máxima que se ajuste a su caso de uso.

Para mantener un registro de los lotes descartados, configure un destino de evento fallido. Lambda enviará un documento a la cola o al tema de destino con información sobre el lote.

Para configurar un destino para registros de eventos con errores

  1. Abra la Página de funciones en la consola de Lambda.

  2. Elija una función.

  3. En Function overview (Descripción general de la función), elija Add destination (Agregar destino).

  4. En Source (Origen), elija Stream invocation (Invocación por secuencias).

  5. En Stream (Secuencia), elija una secuencia que esté asignada a la función.

  6. En Destination type (Tipo de destino), elija el tipo de recurso que recibe el registro de invocación.

  7. En Destination (Destino), elija un recurso.

  8. Seleccione Save.

El siguiente ejemplo muestra un registro de invocación para un flujo de Kinesis.

ejemplo Registro de invocación

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Puede utilizar esta información para recuperar los registros afectados de la secuencia y solucionar los problemas. Los registros reales no están incluidos, por lo que debe procesar este registro y recuperarlos de la secuencia antes de que venzan y se pierdan.

Métricas de Amazon CloudWatch

Lambda emite la métrica IteratorAge cuando la función termina de procesar un lote de registros. La métrica indica la antigüedad del último registro del lote cuando acabo el proceso. Si la función está procesando nuevos eventos, puede utilizar la antigüedad del iterador para estimar la latencia entre el momento en que se añade un registro y el momento en que la función lo procesa.

Una tendencia ascendente en la antigüedad del iterador puede indicar problemas con la función. Para obtener más información, consulte Trabajar con métricas de funciones de AWS Lambda.

Ventanas de tiempo

Las funciones Lambda pueden ejecutar aplicaciones de procesamiento de flujo continuo. Una secuencia representa un datos ilimitados que fluyen de forma continua a través de su aplicación. Para analizar la información de esta entrada de actualización continua, puede enlazar los registros incluidos mediante una ventana definida en términos de tiempo.

Las invocaciones Lambda no tienen estado: no se pueden utilizar para procesar datos en múltiples invocaciones continuas sin una base de datos externa. Sin embargo, con la ventana habilitada, puede mantener su estado en todas las invocaciones. Este estado contiene el resultado agregado de los mensajes procesados previamente para la ventana actual. Su estado puede ser un máximo de 1 MB por fragmento. Si supera ese tamaño, Lambda finaliza la ventana antes de tiempo.

Ventanas de saltos de tamaño constante

Las funciones Lambda pueden agregar datos usando ventanas de salto constante: ventanas de tiempo distintas que se abren y cierran a intervalos regulares. Las ventanas de salto constante le permiten procesar orígenes de datos de transmisión a través de ventanas de tiempo contiguas y no superpuestas.

Cada registro de una secuencia pertenece a una ventana específica. Un registro se procesa sólo una vez, cuando Lambda procesa la ventana a la que pertenece el registro. En cada ventana, puede realizar cálculos, como una suma o un promedio, en el nivel de clave de partición dentro de un fragmento.

Agregación y procesamiento

Su función administrada por el usuario se invoca tanto para la agregación como para procesar los resultados finales de esa agregación. Lambda agrega todos los registros recibidos en la ventana. Puede recibir estos registros en varios lotes, cada uno como una invocación independiente. Cada invocación recibe un estado. También puede procesar registros y devolver un nuevo estado, que se pasa en la siguiente invocación. Lambda devuelve un TimeWindowEventResponse en JSON en el siguiente formato:

ejemplo Valores TimeWindowEventReponse

{ "state": { "1": 282, "2": 715 }, "batchItemFailures": [] }
nota

Para las funciones Java, se recomienda utilizar un mapa<String, String> para representar el estado.

Al final de la ventana, el indicador isFinalInvokeForWindow está configurado en true para indicar que este es el estado final y que está listo para su procesamiento. Después del procesamiento, la ventana se completa y su invocación final se completa, y luego se elimina el estado.

Al final de la ventana, Lambda utiliza el procesamiento final para las acciones en los resultados de agregación. Su procesamiento final se invoca sincrónicamente. Después de la invocación exitosa, los puntos de control de la función el número de secuencia y el procesamiento de flujo continúa. Si la invocación no tiene éxito, su función Lambda suspende el procesamiento posterior hasta una invocación exitosa.

ejemplo KinesisTimeWindowEvent

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1607497475.000 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role", "awsRegion": "us-east-1", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream" } ], "window": { "start": "2020-12-09T07:04:00Z", "end": "2020-12-09T07:06:00Z" }, "state": { "1": 282, "2": 715 }, "shardId": "shardId-000000000006", "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream", "isFinalInvokeForWindow": false, "isWindowTerminatedEarly": false }

Configuración

Puede configurar ventanas de salto constante al crear o actualizar una asignación de origen de eventos. Para configurar una ventana de salto constante, especifique la ventana en segundos. El siguiente comando de ejemplo AWS Command Line Interface (AWS CLI) crea una asignación de origen de eventos de transmisión que tiene una ventana de salto constante de 120 segundos. Se nombra la función Lambda definida para la agregación y el procesamiento se llama tumbling-window-example-function.

aws lambda create-event-source-mapping --event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream --function-name "arn:aws:lambda:us-east-1:123456789018:function:tumbling-window-example-function" --region us-east-1 --starting-position TRIM_HORIZON --tumbling-window-in-seconds 120

Lambda determina los límites de la ventana de salto constante en función de la hora en que se insertaron los registros en la secuencia. Todos los registros tienen una marca de hora aproximada disponible que Lambda utiliza en las determinaciones de límites.

Las agregaciones de ventanas de saltos constantes no admiten el reendurecimiento. Cuando el fragmento termina, Lambda considera la ventana cerrada y los fragmentos secundarios comienzan su propia ventana en un estado fresco.

Ventanas de saltos constantes son totalmente compatibles con las directivas de reintento existentes maxRetryAttempts y maxRecordAge.

ejemplo Handler.py – Agregación y procesamiento

La siguiente función de Python muestra cómo agregar y luego procesar su estado final:

def lambda_handler(event, context): print('Incoming event: ', event) print('Incoming state: ', event['state']) #Check if this is the end of the window to either aggregate or process. if event['isFinalInvokeForWindow']: # logic to handle final state of the window print('Destination invoke') else: print('Aggregate invoke') #Check for early terminations if event['isWindowTerminatedEarly']: print('Window terminated early') #Aggregation logic state = event['state'] for record in event['Records']: state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1 print('Returning state: ', state) return {'state': state}

Informes de fallos de artículos de lote

Al consumir y procesar datos de transmisión desde un origen de eventos, de forma predeterminada Lambda los puntos de control hasta el número de secuencia más alto de un lote sólo cuando el lote es un éxito completo. Lambda trata todos los demás resultados como un error completo y reintenta procesar el lote hasta el límite de reintento. Para permitir éxitos parciales al procesar lotes de una secuencia, active ReportBatchItemFailures. Permitir éxitos parciales puede ayudar a reducir el número de reintentos en un registro, aunque no impide por completo la posibilidad de reintentos en un registro exitoso.

Para activar ReportBatchItemFailures, incluya el valor enumerado ReportBatchItemFailures en la lista FunctionResponseTypes. Esta lista indica qué tipos de respuesta están habilitados para su función. Puede configurar esta lista al crear o actualizar una asignación de origen de eventos.

Sintaxis del informe

Al configurar los informes sobre errores de elementos por lotes, la clase StreamsEventResponse se devuelve con una lista de errores de elementos de lote. Puede utilizar un objeto StreamsEventResponse para devolver el número de secuencia del primer registro fallido del lote. También puede crear su propia clase personalizada usando la sintaxis de respuesta correcta. La siguiente estructura JSON muestra la sintaxis de respuesta requerida:

{ "batchItemFailures": [ { "itemIdentifier": "<id>" } ] }

Condiciones de éxito y fracaso

Lambda trata un lote como un éxito completo si devuelve cualquiera de los siguientes elementos:

  • Una lista batchItemFailure vacía

  • Una lista batchItemFailure nula

  • Un EventResponse vacío

  • Un EventResponse nulo

Lambda trata un lote como un error completo si devuelve cualquiera de los siguientes elementos:

  • Una itemIdentifier cadena vacía

  • Un itemIdentifier nulo

  • Un itemIdentifier con un mal nombre de clave

Lambda reintentos fallidos basados en su estrategia de reintento.

Bisecar un lote

Si su invocación falla y BisectBatchOnFunctionError está activada, el lote se divide en bisectos independientemente de su configuración ReportBatchItemFailures.

Cuando se recibe una respuesta de éxito parcial de lote y ReportBatchItemFailures se activan ambos BisectBatchOnFunctionError y se activan, el lote se divide en el número de secuencia devuelto y Lambda vuelve a intentar solo los registros restantes.

Java

ejemplo Handler.java – devuelve el nuevo StreamSeventResponse ()

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KinesisEvent; import java.io.Serializable; import java.util.ArrayList; import java.util.List; public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, Serializable> { @Override public Serializable handleRequest(KinesisEvent input, Context context) { List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<*>(); String curRecordSequenceNumber = ""; for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) { try { //Process your record KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis(); curRecordSequenceNumber = kinesisRecord.getSequenceNumber(); } catch (Exception e) { //Return failed record's sequence number batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber)); return new StreamsEventResponse(batchItemFailures); } } return new StreamsEventResponse(batchItemFailures); } }
Python

ejemplo Handler.py – devuelve BatchItemFailures []

def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}