Tratamento de erros para uma origem de eventos do SQS no Lambda - AWS Lambda

Tratamento de erros para uma origem de eventos do SQS no Lambda

Para lidar com erros relacionados a uma fonte de eventos do SQS, o Lambda usa automaticamente uma estratégia de repetição com uma estratégia de recuo. Você também pode personalizar o comportamento de tratamento de erros configurando o mapeamento de origem de eventos do SQS para retornar respostas parciais em lote.

Estratégia de recuo para invocações com falha

Quando uma invocação falha, o Lambda tenta repetir a invocação enquanto implementa uma estratégia de recuo. A estratégia de recuo difere ligeiramente caso o Lambda tenha encontrado a falha devido a um erro no código da função ou devido ao controle de utilização.

  • Se o código da função causou o erro, o Lambda interromperá o processamento e repetirá a invocação. Enquanto isso, o Lambda recua gradualmente, reduzindo a quantidade de simultaneidade alocada ao mapeamento da origem do evento do Amazon SQS. Depois que limite de tempo de visibilidade da sua fila se esgotar, a mensagem será mostrada novamente na fila.

  • Se a invocação apresentar falhas devido ao controle de utilização, o Lambda recua gradualmente as novas tentativas, reduzindo a quantidade de simultaneidade alocada para o mapeamento da origem do evento do Amazon SQS. O Lambda continuará a repetir a mensagem até que o carimbo de data/hora da mensagem exceda o tempo limite de visibilidade da fila, que corresponde ao momento em que o Lambda descartará a mensagem.

Implementar respostas parciais em lote

Quando sua função do Lambda encontra um erro ao processar um lote, todas as mensagens nesse lote tornam-se novamente visíveis na fila por padrão, incluindo mensagens que o Lambda processou com sucesso. Como resultado, a função pode acabar processando a mesma mensagem diversas vezes.

Para não precisar reprocessar todas as mensagens processadas com êxito em um lote com falha, você pode configurar o mapeamento da origem do evento para tornar visíveis novamente apenas as mensagens com falha. Isso se chama resposta parcial em lote. Para ativar respostas parciais em lote, especifique ReportBatchItemFailures para a ação FunctionResponseTypes ao configurar o mapeamento da origem do evento. Isso permite que a função retorne um sucesso parcial, podendo ajudar a reduzir o número de novas tentativas desnecessárias nos registros.

Quando ReportBatchItemFailures estiver ativado, o Lambda não reduzirá a escala verticalmente da sondagem de mensagens quando as invocações de funções falharem. Se você espera que algumas mensagens falhem, e não quer que essas falhas afetem a taxa de processamento de mensagens, use ReportBatchItemFailures.

nota

Ao usar respostas parciais em lote, tenha em mente:

  • Se a sua função lançar uma exceção, o lote inteiro será considerado uma falha total.

  • Se você estiver usando esse recurso com uma fila FIFO, a função deverá interromper o processamento de mensagens após a primeira falha e retornar todas as mensagens com falha e não processadas no batchItemFailures. Isso ajuda a preservar a ordem das mensagens na sua fila.

Para ativar o relatório parcial em lote
  1. Leia as práticas recomendadas para implementar respostas parciais em lote.

  2. Execute o comando a seguir para ativar ReportBatchItemFailures para a função. Para recuperar o UUID do mapeamento da origem do evento, execute o comando list-event-source-mappings da AWS CLI.

    aws lambda update-event-source-mapping \ --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \ --function-response-types "ReportBatchItemFailures"
  3. Atualize o código da função para capturar todas as exceções e retornar mensagens com falha em uma resposta batchItemFailures do JSON. A resposta batchItemFailures deve incluir uma lista de IDs de mensagens, como valores itemIdentifier do JSON.

    Por exemplo, suponha que você tenha um lote de cinco mensagens, com os IDs de mensagem id1, id2, id3, id4 e id5. Sua função processa id1,id3 e id5 com sucesso. Para tornar as mensagens id2 e id4 novamente visíveis na fila, a função deve retornar a seguinte resposta:

    { "batchItemFailures": [ { "itemIdentifier": "id2" }, { "itemIdentifier": "id4" } ] }

    Veja alguns exemplos de código de função que retornam a lista de IDs de mensagens com falha no lote:

    .NET
    AWS SDK for .NET
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando o .NET.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using Amazon.Lambda.Core; using Amazon.Lambda.SQSEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace sqsSample; public class Function { public async Task<SQSBatchResponse> FunctionHandler(SQSEvent evnt, ILambdaContext context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new List<SQSBatchResponse.BatchItemFailure>(); foreach(var message in evnt.Records) { try { //process your message await ProcessMessageAsync(message, context); } catch (System.Exception) { //Add failed message identifier to the batchItemFailures list batchItemFailures.Add(new SQSBatchResponse.BatchItemFailure{ItemIdentifier=message.MessageId}); } } return new SQSBatchResponse(batchItemFailures); } private async Task ProcessMessageAsync(SQSEvent.SQSMessage message, ILambdaContext context) { if (String.IsNullOrEmpty(message.Body)) { throw new Exception("No Body in SQS Message."); } context.Logger.LogInformation($"Processed message {message.Body}"); // TODO: Do interesting work based on the new message await Task.CompletedTask; } }
    Go
    SDK para Go V2
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando Go.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) func handler(ctx context.Context, sqsEvent events.SQSEvent) (map[string]interface{}, error) { batchItemFailures := []map[string]interface{}{} for _, message := range sqsEvent.Records { if /* Your message processing condition here */ { batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": message.MessageId}) } } sqsBatchResponse := map[string]interface{}{ "batchItemFailures": batchItemFailures, } return sqsBatchResponse, nil } func main() { lambda.Start(handler) }
    Java
    SDK para Java 2.x
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando Java.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.SQSEvent; import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse; import java.util.ArrayList; import java.util.List; public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> { @Override public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) { List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>(); String messageId = ""; for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) { try { //process your message messageId = message.getMessageId(); } catch (Exception e) { //Add failed message identifier to the batchItemFailures list batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId)); } } return new SQSBatchResponse(batchItemFailures); } }
    JavaScript
    SDK para JavaScript (v3)
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando JavaScript.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 export const handler = async (event, context) => { const batchItemFailures = []; for (const record of event.Records) { try { await processMessageAsync(record, context); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return { batchItemFailures }; }; async function processMessageAsync(record, context) { if (record.body && record.body.includes("error")) { throw new Error("There is an error in the SQS Message."); } console.log(`Processed message: ${record.body}`); }

    Relatar falhas de item em lote do SQS com o Lambda usando TypeScript.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import { SQSEvent, SQSBatchResponse, Context, SQSBatchItemFailure, SQSRecord } from 'aws-lambda'; export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => { const batchItemFailures: SQSBatchItemFailure[] = []; for (const record of event.Records) { try { await processMessageAsync(record); } catch (error) { batchItemFailures.push({ itemIdentifier: record.messageId }); } } return {batchItemFailures: batchItemFailures}; }; async function processMessageAsync(record: SQSRecord): Promise<void> { if (record.body && record.body.includes("error")) { throw new Error('There is an error in the SQS Message.'); } console.log(`Processed message ${record.body}`); }
    PHP
    SDK para PHP
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando PHP.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 <?php use Bref\Context\Context; use Bref\Event\Sqs\SqsEvent; use Bref\Event\Sqs\SqsHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends SqsHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleSqs(SqsEvent $event, Context $context): void { $this->logger->info("Processing SQS records"); $records = $event->getRecords(); foreach ($records as $record) { try { // Assuming the SQS message is in JSON format $message = json_decode($record->getBody(), true); $this->logger->info(json_encode($message)); // TODO: Implement your custom processing logic here } catch (Exception $e) { $this->logger->error($e->getMessage()); // failed processing the record $this->markAsFailed($record); } } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords SQS records"); } } $logger = new StderrLogger(); return new Handler($logger);
    Python
    SDK para Python (Boto3).
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando Python.

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response
    Ruby
    SDK para Ruby
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando Ruby.

    # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 require 'json' def lambda_handler(event:, context:) if event batch_item_failures = [] sqs_batch_response = {} event["Records"].each do |record| begin # process message rescue StandardError => e batch_item_failures << {"itemIdentifier" => record['messageId']} end end sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response end end
    Rust
    SDK para Rust
    nota

    Há mais no GitHub. Encontre o exemplo completo e saiba como configurar e executar no repositório dos Exemplos sem servidor.

    Relatar falhas de itens em lote do SQS com o Lambda usando Rust.

    // Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 use aws_lambda_events::{ event::sqs::{SqsBatchResponse, SqsEvent}, sqs::{BatchItemFailure, SqsMessage}, }; use lambda_runtime::{run, service_fn, Error, LambdaEvent}; async fn process_record(_: &SqsMessage) -> Result<(), Error> { Err(Error::from("Error processing message")) } async fn function_handler(event: LambdaEvent<SqsEvent>) -> Result<SqsBatchResponse, Error> { let mut batch_item_failures = Vec::new(); for record in event.payload.records { match process_record(&record).await { Ok(_) => (), Err(_) => batch_item_failures.push(BatchItemFailure { item_identifier: record.message_id.unwrap(), }), } } Ok(SqsBatchResponse { batch_item_failures, }) } #[tokio::main] async fn main() -> Result<(), Error> { run(service_fn(function_handler)).await }

Se os eventos com falha não retornarem à fila, consulte Como soluciono problemas da função do Lambda do SQS ReportBatchItemFailures? no Centro de Conhecimento da AWS.

Condições de sucesso e falha

O Lambda considera um lote um sucesso total quando a função retorna qualquer um dos seguintes:

  • Uma lista de batchItemFailures vazia

  • Uma lista de batchItemFailures nula

  • Uma EventResponse vazia

  • Uma EventResponse nula

O Lambda considera um lote uma falha total quando a função retorna qualquer um dos seguintes:

  • Uma resposta JSON inválida

  • Uma string itemIdentifier vazia

  • Uma itemIdentifier nula

  • Um itemIdentifier com um nome de chave inválido

  • Um valor itemIdentifier com um ID de mensagem inexistente

Métricas do CloudWatch

Para determinar se a função está reportando falhas de itens em lote corretamente, é possível monitorar as métricas do Amazon SQS NumberOfMessagesDeleted e ApproximateAgeOfOldestMessage no Amazon CloudWatch.

  • NumberOfMessagesDeleted rastreia o número de mensagens removidas da sua fila. Se o número cair para 0, é um sinal de que a resposta da função não está retornando corretamente mensagens com falha.

  • ApproximateAgeOfOldestMessage rastreia quanto tempo a mensagem mais antiga permaneceu na sua fila. Um aumento acentuado nessa métrica pode indicar que a função não está retornando mensagens com falha corretamente.