Gestión de errores para un origen de eventos de SQS en Lambda - AWS Lambda

Gestión de errores para un origen de eventos de SQS en Lambda

Para gestionar los errores relacionados con un origen de eventos de SQS, Lambda utiliza automáticamente una estrategia de reintento con una estrategia de retroceso. También puede personalizar el comportamiento de la gestión de errores si configura la asignación de orígenes de eventos de SQS para que devuelva respuestas parciales por lotes.

Estrategia de retroceso para invocaciones fallidas

Cuando se produce un error en una invocación, Lambda intenta volver a intentar la invocación mientras implementa una estrategia de retraso. La estrategia de retroceso difiere ligeramente en función de si Lambda encontró el error debido a un error en el código de la función o a una limitación.

  • Si el código de la función provocó el error, Lambda dejará de procesar y volver a intentar la invocación. Mientras tanto, Lambda reduce de manera gradual el volumen de simultaneidad asignada a su asignación de orígenes de eventos de Amazon SQS. Cuando se agote el tiempo de espera de visibilidad de la cola, el mensaje volverá a aparecer en la cola.

  • Si la invocación falla debido a la limitación, Lambda retrocede gradualmente los reintentos reduciendo la cantidad de simultaneidad asignada a la asignación de orígenes de eventos de Amazon SQS. Lambda sigue reintentando enviar el mensaje hasta que la marca de tiempo del mensaje supere el tiempo límite de visibilidad de la cola, momento en el que Lambda descarta el mensaje.

Implementación de respuestas parciales por lotes

Cuando la función Lambda detecta un error al procesar un lote, todos los mensajes de ese lote se vuelven a ver en la cola de forma predeterminada, incluidos los mensajes que Lambda procesó correctamente. Como resultado, la función puede terminar procesando el mismo mensaje varias veces.

Para evitar el reprocesamiento de todos los mensajes procesados con éxito en un lote con errores, puede configurar la asignación de orígenes de eventos para que solo se vean de nuevo los mensajes fallidos. Esto se denomina respuesta parcial por lotes. Para activar las respuestas parciales por lotes, especifique ReportBatchItemFailures en la acción FunctionResponseTypes cuando configure la asignación de orígenes de eventos. Esto permite que la función devuelva un éxito parcial, lo que puede ayudar a reducir el número de reintentos innecesarios en los registros.

Cuando ReportBatchItemFailures está activado, Lambda no reduce verticalmente el sondeo de mensajes cuando fallan las invocaciones de las funciones. Utilice ReportBatchItemFailures, si espera que algunos mensajes fallen y no quiere que esos errores afecten a la velocidad de procesamiento de los mensajes.

nota

Tenga en cuenta las siguientes consideraciones al utilizar las respuestas parciales por lotes:

  • Si la función genera una excepción, todo el lote se considera un error completo.

  • Si utiliza esta característica con una cola FIFO, la función debe dejar de procesar los mensajes después del primer error y devolver todos los mensajes fallidos y sin procesar en batchItemFailures. Esto ayuda a preservar el orden de los mensajes en la cola.

Para activar los informes parciales por lotes
  1. Revise las Prácticas recomendadas para implementar las respuestas parciales por lotes.

  2. Ejecute el siguiente comando para activar ReportBatchItemFailures para la función. Para recuperar el UUID de la asignación de orígenes de eventos, ejecute el comando list-event-source-mappings de la AWS CLI.

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. Actualice el código de la función para detectar todas las excepciones y devolver los mensajes fallidos en una respuesta JSON batchItemFailures. La respuesta batchItemFailures debe incluir una lista de ID de mensajes, como valores JSON itemIdentifier.

    Supongamos que tiene un lote de cinco mensajes con ID de mensaje id1, id2, id3, id4 y id5. Su función procesa correctamente id1, id3 y id5. Para hacer que los mensajes id2 y id4 sean de nuevo visibles en la cola, la función debe devolver la siguiente respuesta:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    Estos son algunos ejemplos de código de función que devuelven la lista de IDs de mensajes fallidos del lote:

    .NET
    AWS SDK for .NET
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante .NET.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK para Go V2
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Go.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK para Java 2.x
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Java.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    SDK para JavaScript (v3)
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante JavaScript.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante TypeScript.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }
    PHP
    SDK para PHP
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante PHP.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { $this->logger->info("Processing SQS records"); $records = $event->getRecords(); foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $this->markAsFailed($record); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); } } $logger = new StderrLogger(); return new Handler($logger);
    Python
    SDK para Python (Boto3)
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Python.

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK para Ruby
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Ruby.

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK para Rust
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Rust.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

Si los eventos fallidos no vuelven a la cola, consulte ¿Cómo soluciono los problemas de la función de Lambda SQS ReportBatchItemFailures? en el Centro de conocimientos de AWS.

Condiciones de éxito y fracaso

Lambda trata un lote como un éxito completo si la función devuelve cualquiera de los siguientes elementos:

  • Una lista batchItemFailures vacía

  • Una lista batchItemFailures nula

  • Una EventResponse vacía

  • Una EventResponse nula

Lambda trata un lote como un error completo si la función devuelve cualquiera de los siguientes elementos:

  • Respuesta JSON no válida

  • Una cadena itemIdentifier vacía

  • Una itemIdentifier nula

  • Un itemIdentifier con un mal nombre de clave

  • Un valor itemIdentifier con un ID de mensaje que no existe

Métricas de CloudWatch

Para determinar si la función informa correctamente de los errores de elementos por lotes, puede monitorear las métricas Amazon SQS NumberOfMessagesDeleted y ApproximateAgeOfOldestMessage en Amazon CloudWatch.

  • NumberOfMessagesDeleted realiza un seguimiento del número de mensajes eliminados de la cola. Si cae a 0, es una señal de que la respuesta de la función no está devolviendo correctamente los mensajes fallidos.

  • ApproximateAgeOfOldestMessage hace un seguimiento de cuánto tiempo ha permanecido el mensaje más antiguo en la cola. Un aumento brusco de esta métrica puede indicar que la función no está devolviendo correctamente los mensajes fallidos.