Uso de Lambda con Amazon SQS - AWS Lambda

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de Lambda con Amazon SQS

nota

Si desea enviar datos a un destino que no sea una función de Lambda o enriquecer los datos antes de enviarlos, consulte Amazon EventBridge Pipes (Canalizaciones de Amazon EventBridge).

Puede utilizar una función de Lambda para procesar mensajes en una cola de Amazon Simple Queue Service (Amazon SQS). Los mapeos de fuente de eventos de Lambda son compatibles con las colas estándar y las colas de primero en entrar, primero en salir (FIFO). Con Amazon SQS, puede descargar tareas de un componente de su aplicación enviándolas a una cola para, a continuación, procesarlas de forma asíncrona.

Lambda sondea la cola e invoca su función de Lambda sincrónicamente con un evento que contiene mensajes de cola. Lambda lee mensajes en lotes e invoca la función una vez por lote. Cuando la función procesa correctamente un lote, Lambda elimina sus mensajes de la cola.

Cuando Lambda lee un lote, los mensajes se mantienen en la cola, pero se ocultan durante el tiempo de espera de visibilidad de la cola. Cuando la función procesa correctamente un lote, Lambda elimina sus mensajes de la cola. De forma predeterminada, si la función detecta un error al procesar un lote, todos los mensajes de ese lote se vuelven a ver en la cola después de que expire el tiempo de visibilidad. Por este motivo, el código de su función debe ser capaz de procesar el mismo mensaje varias veces sin efectos secundarios no deseados.

aviso

Las asignaciones de orígenes de eventos de Lambda procesan cada evento al menos una vez y puede producirse un procesamiento duplicado de lotes. Para evitar posibles problemas relacionados con la duplicación de eventos, le recomendamos encarecidamente que haga que el código de la función sea idempotente. Para obtener más información, consulte ¿Cómo puedo hacer que mi función de Lambda sea idempotente? en el Centro de conocimientos de AWS.

Para evitar que Lambda procese un mensaje varias veces, puede configurar la asignación de orígenes de eventos para incluir los errores de los elementos del lote en la respuesta de la función, o puede utilizar la acción DeleteMessage de la API de Amazon SQS para eliminar los mensajes de la cola a medida que la función de Lambda los procese correctamente. Para obtener más información sobre el uso de la API de Amazon SQS, consulte la Referencia de la API del servicio de Amazon Simple Queue.

Ejemplo de evento de mensaje en cola estándar

ejemplo Evento de mensaje de Amazon SQS (cola estándar)
{ "Records": [ { "messageId": "059f36b4-87a3-44ab-83d2-661975830a7d", "receiptHandle": "AQEBwJnKyrHigUMZj6rYigCgxlaS3SLy0a...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082649183", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082649185" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" }, { "messageId": "2e1424d4-f796-459a-8184-9c92662be6da", "receiptHandle": "AQEBzWwaftRI0KuVm4tP+/7q1rGgNqicHq...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1545082650636", "SenderId": "AIDAIENQZJOLO23YVJ4VO", "ApproximateFirstReceiveTimestamp": "1545082650649" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:my-queue", "awsRegion": "us-east-2" } ] }

De forma predeterminada, Lambda sondea hasta 10 mensajes en su cola a la vez y envía ese lote a la función. Para evitar invocar a la función con un número de registros pequeño, puede indicar al origen del evento que almacene en búfer registros hasta 5 minutos configurando una ventana de lote. Antes de invocar la función, Lambda continúa el sondeo de los mensajes de la cola estándar de SQS hasta que caduque la ventana de lotes, se alcance la cuota de tamaño de carga de invocaciones o se alcance el tamaño de lote máximo configurado.

Si utiliza un intervalo de lote y la cola de SQS contiene muy poco tráfico, Lambda puede esperar hasta 20 segundos antes de invocar la función. Esto sucederá incluso si establece un intervalo de lote inferior a 20 segundos.

nota

En Java, es posible que se produzcan errores de puntero nulo al deserializar JSON. Esto podría deberse a la forma en que el mapeador de objetos JSON convierte las mayúsculas/minúsculas de “Records” y “EventSourceARN”.

Ejemplo de evento de mensajes de cola FIFO

Para las colas FIFO, los registros contienen atributos adicionales relacionados con la deduplicación y la secuenciación.

ejemplo Evento de mensaje de Amazon SQS (cola de FIFO)
{ "Records": [ { "messageId": "11d6ee51-4cc7-4302-9e22-7cd8afdaadf5", "receiptHandle": "AQEBBX8nesZEXmkhsmZeyIE8iQAMig7qw...", "body": "Test message.", "attributes": { "ApproximateReceiveCount": "1", "SentTimestamp": "1573251510774", "SequenceNumber": "18849496460467696128", "MessageGroupId": "1", "SenderId": "AIDAIO23YVJENQZJOL4VO", "MessageDeduplicationId": "1", "ApproximateFirstReceiveTimestamp": "1573251510774" }, "messageAttributes": {}, "md5OfBody": "e4e68fb7bd0e697a0ae8f1bb342846b3", "eventSource": "aws:sqs", "eventSourceARN": "arn:aws:sqs:us-east-2:123456789012:fifo.fifo", "awsRegion": "us-east-2" } ] }

Configurar una cola para utilizarla con Lambda

Cree una cola de SQS que sirva como fuente de eventos para la función de Lambda. A continuación, configure la cola para dar tiempo a su función de Lambda a procesar cada lote de eventos y para que Lambda vuelva a actuar en respuesta a los errores de limitación controlada a medida que escala verticalmente.

Para que su función tenga tiempo de procesar cada lote de registros, establezca el tiempo de espera de visibilidad de la cola de origen en al menos seis veces el tiempo de espera que configure en su función. El tiempo adicional permitirá a Lambda volver a intentar realizar los procesos en caso de que la función se vea limitada debido al procesamiento de un lote anterior.

Si su función no puede procesar un mensaje en repetidas ocasiones, Amazon SQS podrá enviar dicho mensaje a una cola de mensajes fallidos. Si la función devuelve un error, todos los elementos del lote vuelven a la cola. Después de que se supera el tiempo de espera de visibilidad, Lambda recibe el mensaje de nuevo. Para enviar mensajes a una segunda cola después de recibir varias veces, configure una cola de mensajes fallidos en la cola de origen.

nota

Asegúrese de configurar la cola de mensajes fallidos en la cola de origen y no en la función de Lambda. La cola de mensajes fallidos que configure en una función se utiliza para la cola de invocación asíncrona de la función, no para las colas de origen de eventos.

Si la función devuelve un error o no puede invocarse porque está en la máxima simultaneidad, es posible que el procesado tenga éxito tras algunos intentos adicionales. Para dar a los mensajes una oportunidad mejor de ser procesados antes de enviarlos a la cola de mensajes fallidos, establezca el maxReceiveCount de la política de redirección de la cola de origen a como mínimo 5.

Permisos de rol de ejecución

La política gestionada de AWS, AWSLambdaSQSQueueExecutionRole, contiene los permisos necesarios para que Lambda pueda leer de la cola de Amazon SQS. Agregar esta política gestionada al rol de ejecución de la función.

De manera opcional, si utiliza una cola cifrada, también debe agregar el siguiente permiso a su rol de ejecución:

Adición de permisos y creación de la asignación de orígenes de eventos

Cree un mapeo de origen de eventos para indicar a Lambda que envíe elementos desde una cola a una función de Lambda. Puede crear varios mapeos de orígenes de eventos para procesar elementos de varias colas con una sola función. Cuando Lambda invoca la función objetivo, el evento puede contener múltiples elementos, dependiendo del tamaño de lote máximo configurable.

Para configurar la función para que lea desde Amazon SQS, adjunte la política gestionada de AWS AWSLambdaSQSQueueExecutionRole al rol de ejecución y, a continuación, cree un desencadenador de SQS.

Cómo agregar permisos y crear un desencadenador
  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija el nombre de una función.

  3. Elija la pestaña Configuración y, a continuación, elija Permisos.

  4. En Nombre del rol, elija el enlace al rol de ejecución. Este enlace abre el rol en la consola de IAM.

    
              Enlace al rol de ejecución
  5. Elija Agregar permisos y luego Adjuntar políticas.

    
              Políticas adjuntas a la consola de IAM
  6. En el campo de búsqueda, escriba AWSLambdaSQSQueueExecutionRole. Agregue esta política al rol de ejecución. Se trata de una política administrada de AWS que contiene los permisos que la función necesita para leer desde una cola de Amazon SQS. Para obtener más información acerca de esta política, consulte AWSLambdaSQSQueueExecutionRole en la Referencia de políticas administradas de AWS.

  7. Regrese a la función en la consola de Lambda. En Descripción general de la función, elija Agregar desencadenador.

    
              Sección de descripción general de la consola de Lambda
  8. Elija un tipo de desencadenador.

  9. Configure las opciones requeridas y luego elija Agregar.

Lambda admite las siguientes opciones para los orígenes de eventos de Amazon SQS:

Cola de SQS

La cola de Amazon SQS desde la que leer registros.

Activar desencadenador

El estado de la asignación de origen de eventos. De forma predeterninada, la opción Enable trigger (Activar desencadenador) está seleccionada.

Tamaño de lote

El número máximo de registros que se enviarán a la función en cada lote. Para una cola estándar, puede tratarse de hasta 10 000 registros. Para una cola FIFO, el máximo es 10. Para un tamaño de lote superior a 10, también debe establecer la ventana del lote (MaximumBatchingWindowInSeconds) a al menos 1 segundo.

Configure el tiempo de espera de la función para que disponga de tiempo suficiente para procesar todo un lote de elementos. Si elementos tardan mucho tiempo en procesarse, elija un tamaño de lote más pequeño. Un tamaño de lote amplio puede mejorar la eficiencia para cargas de trabajo que son muy rápidas o tienen mucha administración. Si configura la simultaneidad reservada en la función, establezca al menos cinco ejecuciones simultáneas para reducir la posibilidad de errores de limitación controlada cuando Lambda invoque la función.

Lambda pasa todos los registros del lote a la función en una sola llamada, siempre y cuando el tamaño total de los eventos no exceda la cuota de tamaño de carga de invocaciones para la invocación síncrona (6 MB). Tanto Lambda como Amazon SQS generan metadatos por cada registro. Estos metadatos adicionales se cuentan como el tamaño total de la carga útil y pueden hacer que el número total de registros enviados en un lote sea inferior al tamaño del lote configurado. Los campos de metadatos que Amazon SQS envía pueden tener una longitud variable. Para obtener más información acerca de los campos de metadatos de Amazon SQS, consulte la documentación de operación de la API ReceiveMessage en la Referencia de la API de Amazon Simple Queue Service.

Ventana del lote

La cantidad de tiempo máxima para recopilar registros antes de invocar la función, en segundos. Esto se aplica únicamente a las colas estándar.

Si utiliza una ventana por lotes superior a 0 segundos, debe tener en cuenta el tiempo de procesamiento aumentado en el tiempo de espera de visibilidad de la cola. Recomendamos establecer el tiempo de espera de visibilidad de la cola hasta en seis veces el tiempo de espera, más el valor de MaximumBatchingWindowInSeconds. Esto permite tiempo para que su función de Lambda procese cada lote de eventos y vuelva a intentarlo en caso de un error de limitación.

Cuando los mensajes están disponibles, Lambda comienza a procesar los mensajes en lotes. Lambda comienza a procesar 5 lotes a la vez con 5 invocaciones simultáneas de la función. Si los mensajes siguen estando disponibles, Lambda agrega hasta 300 instancias más de la función por minuto, hasta un máximo de 1000 instancias de función. Para obtener más información sobre la simultaneidad y el escalado de funciones, consulte Escalado de la función de Lambda.

Para procesar más mensajes, puede optimizar la función de Lambda para obtener un mayor rendimiento. Consulte Entender cómo se escala AWS Lambda con las colas estándar de Amazon SQS.

Simultaneidad máxima

Número máximo de funciones simultáneas que puede invocar la fuente del evento. Para obtener más información, consulte Configuración de la simultaneidad máxima para los orígenes de eventos de Amazon SQS.

Criterios del filtro

Agregue criterios de filtrado para controlar qué eventos envía Lambda a su función para su procesamiento. Para obtener más información, consulte Filtrado de eventos de Lambda.

Escalado y procesamiento

Para las colas estándar, Lambda utiliza el sondeo largo para sondear una cola hasta que se active. Cuando los mensajes están disponibles, Lambda comienza a procesar cinco lotes a la vez con cinco invocaciones simultáneas de la función. Si los mensajes siguen estando disponibles, Lambda aumenta el número de procesos que son lotes de lectura hasta 300 instancias más por minuto. El número máximo de lotes que una asignación de origen de eventos puede procesar simultáneamente es 1 000.

Para colas FIFO, Lambda envía mensajes a su función en el orden en que los recibe. Cuando envíe un mensaje a una cola FIFO, especifique un ID de grupo de mensajes. Amazon SQS garantiza que los mensajes del mismo grupo se entreguen a Lambda en orden. Lambda ordena los mensajes en grupos y envía solo un lote a la vez para un grupo. Si la función devuelve un error, esta realiza todos los reintentos en los mensajes afectados antes de que Lambda reciba mensajes adicionales del mismo grupo.

Su función puede escalar simultáneamente al número de grupos de mensajes activos. Para obtener más información, consulte SQS FIFO como fuente de eventos en el blog de informática de AWS.

Configuración de la simultaneidad máxima para los orígenes de eventos de Amazon SQS

La configuración de la simultaneidad máxima limita el número de instancias simultáneas de la función que puede invocar un origen de eventos de Amazon SQS. La simultaneidad máxima es una configuración a nivel de origen de eventos. Si tiene varias fuentes de eventos de Amazon SQS asignación de orígenes de eventos, cada origen de eventos puede tener una configuración de simultaneidad máxima independiente. Puede utilizar la simultaneidad máxima para evitar que una cola utilice toda la simultaneidad reservada de la función o el resto de la cuota de simultaneidad de la cuenta. No hay ningún cargo por configurar la simultaneidad máxima en un origen de eventos de Amazon SQS.

Es importante destacar que la simultaneidad máxima y reservada son dos configuraciones independientes. No establezca una simultaneidad máxima superior a la reservada de la función. Si configura la simultaneidad máxima, asegúrese de que la simultaneidad reservada de la función es mayor o igual que la simultaneidad máxima total para todos los orígenes de eventos de Amazon SQS de la función. De lo contrario, Lambda podría limitar sus mensajes.

Si no se establece la simultaneidad máxima, Lambda puede escalar su fuente de eventos de Amazon SQS hasta la cuota de simultaneidad total de su cuenta, que es de 1000 de forma predeterminada.

nota

En el caso de las colas FIFO, las invocaciones simultáneas están limitadas por el número de ID de grupos de mensajes (messageGroupId) o por la configuración de simultaneidad máxima, lo que sea inferior. Por ejemplo, si tiene seis ID de grupos de mensajes y la simultaneidad máxima está establecida en 10, la función puede tener un máximo de seis invocaciones simultáneas.

Puede configurar la simultaneidad máxima en la asignación de orígenes de eventos nuevos y existentes de Amazon SQS.

Configure la simultaneidad máxima mediante la consola Lambda
  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija el nombre de una función.

  3. En Function overview (Descripción general de las funciones), elija SQS. Esta abre la pestaña Configuration (Configuración).

  4. Seleccione el activador de Amazon SQS y elija Edit (Editar).

  5. En Maximum concurrency (Simultaneidad máxima), introduzca un número entre 2 y 1000. Para desactivar la simultaneidad máxima, deje la casilla vacía.

  6. Seleccione Save (Guardar).

Configure la simultaneidad máxima mediante la AWS Command Line Interface (AWS CLI)

Utilice el comando asignación de orígenes de eventos con la opción --scaling-config. Ejemplo:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config '{"MaximumConcurrency":5}'

Para desactivar la simultaneidad máxima, introduzca un valor vacío para --scaling-config:

aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --scaling-config "{}"
Configure la simultaneidad máxima mediante la API de Lambda

Utilice las acciones de CreateEventSourceMapping o UpdateEventSourceMapping con un objeto ScalingConfig.

API de asignación de orígenes de eventos

Para administrar un origen de eventos con la AWS Command Line Interface (AWS CLI) o SDK de AWS, puede utilizar las siguientes operaciones de la API:

En el ejemplo siguiente se utiliza AWS CLI para mapear una función denominada my-function a una cola Amazon SQS especificada por su nombre de recurso de Amazon (ARN), con un tamaño de lote de 5 y una ventana de lote de 60 segundos.

aws lambda create-event-source-mapping --function-name my-function --batch-size 5 \ --maximum-batching-window-in-seconds 60 \ --event-source-arn arn:aws:sqs:us-east-2:123456789012:my-queue

Debería ver los siguientes datos de salida:

{ "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 5, "MaximumBatchingWindowInSeconds": 60, "EventSourceArn": "arn:aws:sqs:us-east-2:123456789012:my-queue", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "State": "Creating", "StateTransitionReason": "USER_INITIATED" }

Estrategia de retroceso para invocaciones fallidas

Cuando se produce un error en una invocación, Lambda intenta volver a intentar la invocación mientras implementa una estrategia de retraso. La estrategia de retroceso difiere ligeramente en función de si Lambda encontró el error debido a un error en el código de la función o a una limitación.

  • Si el código de la función provocó el error, Lambda dejará de procesar y volver a intentar la invocación. Mientras tanto, Lambda reduce de manera gradual el volumen de simultaneidad asignada a su asignación de orígenes de eventos de Amazon SQS. Cuando se agote el tiempo de espera de visibilidad de la cola, el mensaje volverá a aparecer en la cola.

  • Si la invocación falla debido a la limitación, Lambda retrocede gradualmente los reintentos reduciendo la cantidad de simultaneidad asignada a la asignación de orígenes de eventos de Amazon SQS. Lambda sigue reintentando enviar el mensaje hasta que la marca de tiempo del mensaje supere el tiempo límite de visibilidad de la cola, momento en el que Lambda descarta el mensaje.

Implementación de respuestas parciales por lotes

Cuando la función Lambda detecta un error al procesar un lote, todos los mensajes de ese lote se vuelven a ver en la cola de forma predeterminada, incluidos los mensajes que Lambda procesó correctamente. Como resultado, la función puede terminar procesando el mismo mensaje varias veces.

Para evitar el reprocesamiento de todos los mensajes procesados con éxito en un lote con errores, puede configurar la asignación de orígenes de eventos para que solo se vean de nuevo los mensajes fallidos. Esto se denomina respuesta parcial por lotes. Para activar las respuestas parciales por lotes, especifique ReportBatchItemFailures en la acción FunctionResponseTypes cuando configure la asignación de orígenes de eventos. Esto permite que la función devuelva un éxito parcial, lo que puede ayudar a reducir el número de reintentos innecesarios en los registros.

Cuando ReportBatchItemFailures está activado, Lambda no reduce verticalmente el sondeo de mensajes cuando fallan las invocaciones de las funciones. Utilice ReportBatchItemFailures, si espera que algunos mensajes fallen y no quiere que esos errores afecten a la velocidad de procesamiento de los mensajes.

nota

Tenga en cuenta las siguientes consideraciones al utilizar las respuestas parciales por lotes:

  • Si la función genera una excepción, todo el lote se considera un error completo.

  • Si utiliza esta característica con una cola FIFO, la función debe dejar de procesar los mensajes después del primer error y devolver todos los mensajes fallidos y sin procesar en batchItemFailures. Esto ayuda a preservar el orden de los mensajes en la cola.

Para activar los informes parciales por lotes
  1. Revise las Prácticas recomendadas para implementar las respuestas parciales por lotes.

  2. Ejecute el siguiente comando para activar ReportBatchItemFailures para la función. Para recuperar el UUID de la asignación de orígenes de eventos, ejecute el comando list-event-source-mappings de la AWS CLI.

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. Actualice el código de la función para detectar todas las excepciones y devolver los mensajes fallidos en una respuesta JSON batchItemFailures. La respuesta batchItemFailures debe incluir una lista de ID de mensajes, como valores JSON itemIdentifier.

    Supongamos que tiene un lote de cinco mensajes con ID de mensaje id1, id2, id3, id4 y id5. Su función procesa correctamente id1, id3 y id5. Para hacer que los mensajes id2 y id4 sean de nuevo visibles en la cola, la función debe devolver la siguiente respuesta:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    Estos son algunos ejemplos de código de función que devuelven la lista de IDs de mensajes fallidos del lote:

    .NET
    AWS SDK for .NET
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante .NET.

    using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK para Go V2
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Go.

    package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK para Java 2.x
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Java.

    import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    SDK para JavaScript (v2)
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante JavaScript.

    export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante TypeScript.

    import { APIGatewayProxyEvent, APIGatewayProxyResult, Context } from 'aws-lambda'; export const handler = async (event: APIGatewayProxyEvent, context: Context): Promise<APIGatewayProxyResult> => { const batchItemFailures: { ItemIdentifier: string }[] = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ ItemIdentifier: record.messageId }); } } return { statusCode: 200, body: JSON.stringify({ batchItemFailures }), }; }; async function processMessageAsync(record: any, context: Context): Promise<void> { if (!record.body) { throw new Error('No Body in SQS Message.'); } context.log(`Processed message ${record.body}`); }
    PHP
    SDK para PHP
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante PHP.

    <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Handler as StdHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler implements StdHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handle(mixed $event, Context $context): array { $sqsEvent = new SqsEvent($event); $this->logger->info("Processing SQS records"); $records = $sqsEvent->getRecords(); $failedRecords = []; foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $failedRecords[] = $record->getMessageId(); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); // Format failures for the response $failures = array_map( fn(string $messageId) => ['itemIdentifier' => $messageId], $failedRecords ); return [ 'batchItemFailures' => $failures ]; } } $logger = new StderrLogger(); return new Handler($logger); ?>
    Python
    SDK para Python (Boto3)
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Python.

    import json def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK para Ruby
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Ruby.

    require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK para Rust
    nota

    Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

    Notificación de los errores de los elementos del lote de SQS con Lambda mediante Rust.

    use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

Si los eventos fallidos no vuelven a la cola, consulte ¿Cómo soluciono los problemas de la función de Lambda SQS ReportBatchItemFailures? en el Centro de conocimientos de AWS.

Condiciones de éxito y fracaso

Lambda trata un lote como un éxito completo si la función devuelve cualquiera de los siguientes elementos:

  • Una lista batchItemFailures vacía

  • Una lista batchItemFailures nula

  • Una EventResponse vacía

  • Una EventResponse nula

Lambda trata un lote como un error completo si la función devuelve cualquiera de los siguientes elementos:

  • Respuesta JSON no válida

  • Una cadena itemIdentifier vacía

  • Una itemIdentifier nula

  • Un itemIdentifier con un mal nombre de clave

  • Un valor itemIdentifier con un ID de mensaje que no existe

Métricas de CloudWatch

Para determinar si la función informa correctamente de los errores de elementos por lotes, puede monitorear las métricas Amazon SQS NumberOfMessagesDeleted y ApproximateAgeOfOldestMessage en Amazon CloudWatch.

  • NumberOfMessagesDeleted realiza un seguimiento del número de mensajes eliminados de la cola. Si cae a 0, es una señal de que la respuesta de la función no está devolviendo correctamente los mensajes fallidos.

  • ApproximateAgeOfOldestMessage hace un seguimiento de cuánto tiempo ha permanecido el mensaje más antiguo en la cola. Un aumento brusco de esta métrica puede indicar que la función no está devolviendo correctamente los mensajes fallidos.

Parámetros de configuración de Amazon SQS

Todos los tipos de fuente de eventos Lambda comparten las mismas operaciones CreateEventSourceMapping y UpdateEventSourceMapping de la API. Sin embargo, solo algunos de los parámetros se aplican a Amazon SQS.

Parámetros de fuente de eventos que se aplican a Amazon SQS
Parámetro Obligatoria Predeterminado Notas

BatchSize

N

10

Para colas estándar, el máximo es 10 000. Para colas FIFO, el máximo es 10.

Habilitado

N

true

EventSourceArn

Y

El ARN del flujo de datos o un consumidor de flujos

FunctionName

Y

FilterCriteria

N

Filtrado de eventos de Lambda

FunctionResponseTypes

N

Para permitir que la función informe de errores específicos de un lote, incluya el valor ReportBatchItemFailures en FunctionResponseTypes. Para obtener más información, consulte Implementación de respuestas parciales por lotes.

MaximumBatchingWindowInSeconds

N

0

ScalingConfig

N

Configuración de la simultaneidad máxima para los orígenes de eventos de Amazon SQS