Uso do AWS Lambda com o Amazon Kinesis - AWS Lambda

Uso do AWS Lambda com o Amazon Kinesis

Você pode usar uma função do AWS Lambda para processar registros em um streaming de dados do Amazon Kinesis. Com o Kinesis, você pode coletar dados de várias fontes e processá-los com vários consumidores. O Lambda oferece suporte a iteradores de streaming de dados padrão e consumidores de fluxo HTTP/2.

O Lambda lê registros do streaming de dados e invoca sua função de forma síncrona com um evento que contém registros de fluxo. O Lambda lê registros em lotes e invoca sua função para processar registros do lote.

exemplo Evento de registro do Kinesis

{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ] }

Se você tiver vários aplicativos que estão lendo registros do mesmo fluxo, pode usar os consumidores de fluxo do Kinesis em vez de iteradores padrão. Os consumidores têm taxa de transferência dedicada de leitura para que não seja necessário competir com outros consumidores dos mesmos dados. Com os consumidores, o Kinesis envia registros para o Lambda por meio de uma conexão HTTP/2, que também pode reduzir a latência entre a adição de um registro e invocação da função.

nota

As configurações de manipulação de erros e simultaneidade não estão disponíveis para consumidores de stream do HTTP/2.

Por padrão, o Lambda invoca sua função assim que os registros estão disponíveis no fluxo. Se o lote que ele ler do fluxo tiver somente um registro, o Lambda enviará apenas um registro à função. Para evitar invocar a função com um número pequeno de registros, você pode dizer à origem dos eventos para fazer o buffer dos registros por até cinco minutos, configurando uma janela de lote. Antes de invocar a função, o Lambda continua lendo os registros do fluxo até reunir um lote completo ou até a janela do lote expirar.

Se a sua função retornar um erro, o Lambda tentará executar novamente o lote até que o processamento seja bem-sucedido ou os dados expirem. Para evitar estilhaços paralisados, você pode configurar o mapeamento de origem de evento para tentar ser executado novamente com um tamanho de lote menor, limitar o número de novas tentativas ou descartar os registros muito antigos. Para manter os eventos descartados, é possível configurar o mapeamento de origem de evento para enviar detalhes sobre lotes com falha para uma fila do SQS ou um tópico do SNS.

Você também pode aumentar a simultaneidade processando vários lotes de cada estilhaço em paralelo. O Lambda pode processar até 10 lotes em cada estilhaço simultaneamente. Se você aumentar o número de lotes simultâneos por estilhaço, o Lambda ainda garantirá o processamento na ordem no nível de chave de partição.

Configurar o stream de dados e a função

Sua função do Lambda é um aplicativo de consumidor para seu streaming de dados. Ele processa um lote de registros por vez de cada estilhaço. Você pode mapear uma função do Lambda para um stream de dados (iterador padrão) ou para um consumidor de um stream (distribuição avançada).

Para os iteradores padrão, o Lambda sonda cada estilhaço em seu fluxo do Kinesis para identificar registros a uma taxa básica de uma vez por segundo. Quando houver mais registros disponíveis, o Lambda mantém lotes de processamento até que a função alcance o fluxo. O mapeamento da origem do evento compartilha a taxa de transferência de leitura com outros consumidores do fragmento.

Para minimizar a latência e maximizar a taxa de transferência de leitura, crie um consumidor de streaming de dados. Os consumidores de fluxo obtêm uma conexão dedicada para cada estilhaço que não afeta outros aplicativos que fazem leitura do fluxo. A taxa de transferência dedicada pode ajudar se você tiver muitos aplicativos lendo os mesmos dados, ou se você estiver reprocessando um fluxo com grandes registros.

Os consumidores de fluxos usam HTTP/2 para reduzir a latência, enviando os registros para o Lambda por meio de uma conexão de longa duração e compactando cabeçalhos de solicitação. Você pode criar um consumidor de fluxo com a APIRegisterStreamConsumer do Kinesis.

$ aws kinesis register-stream-consumer --consumer-name con1 \ --stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "Consumer": { "ConsumerName": "con1", "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608", "ConsumerStatus": "CREATING", "ConsumerCreationTimestamp": 1540591608.0 } }

Para aumentar a velocidade com que sua função processa registros, adicione estilhaços ao fluxo de dados. O Lambda processa registros em cada estilhaço em ordem. Ele deixará de processar registros adicionais em um estilhaço se sua função retornar um erro. Com mais estilhaços, há mais lotes sendo processados de uma só vez, o que diminui o impacto de erros na simultaneidade.

Se a função não conseguir se expandir para lidar com o número total de lotes simultâneos, solicite um aumento de limite ou reserve simultaneidade para a função.

Permissões da função de execução

O Lambda precisa das permissões a seguir para gerenciar recursos relacionados ao seu streaming de dados do Kinesis. Adicione-as à sua função função de execução.

A política gerenciada AWSLambdaKinesisExecutionRole inclui essas permissões. Para obter mais informações, consulte Função de execução do AWS Lambda.

Para enviar registros de lotes com falha para uma fila ou um tópico, sua função precisa de permissões adicionais. Cada serviço de destino requer uma permissão diferente, como se segue:

Configurar um stream como fonte de eventos

Crie um mapeamento de fonte de eventos para orientar o Lambda a enviar registros de seu streaming de dados para uma função do Lambda. É possível criar vários mapeamentos de origem de eventos para processar os mesmos dados com várias funções do Lambda ou processar itens de vários fluxos de dados com uma única função.

Para configurar sua função para leitura a partir do Kinesis no console do Lambda, crie um gatilho Kinesis.

Para criar um gatilho

  1. Abra a página Functions (Funções) do console do Lambda.

  2. Escolha uma função.

  3. Em Designer, escolha Add trigger (Adicionar gatilho).

  4. Escolha um tipo de gatinho.

  5. Configure as opções necessárias e, em seguida, escolha Add (Adicionar).

O Lambda oferece suporte às seguintes opções das fontes do evento do Kinesis.

Opções de fonte do evento

  • Kinesis stream (Fluxo do Kinesis) – O fluxo do Kinesis a partir do qual a leitura de registros é realizada.

  • Consumer (Consumidor) (opcional) – Use um consumidor de fluxo para ler o fluxo em uma conexão dedicada.

  • Batch size (Tamanho do lote) – o número de registros a serem enviados para a função em cada lote, até 10.000. O Lambda transmite todos os registros no lote para a função em uma única chamada, desde que o tamanho total dos eventos não exceda o limite de carga para a invocação síncrona (6 MB).

  • Batch window (Janela de lote) – especifique o máximo de tempo para reunir registros antes de invocar a função, em segundos.

  • Starting position (Posição inicial) – Processe apenas novos registros, todos os registros existentes ou registros criados após uma determinada data.

    • Latest (Mais recente) – Processe novos registros adicionados ao fluxo.

    • Trim horizon (Redução horizontal) – Processe todos os registros no fluxo.

    • At timestamp (Na data e hora) – Processe registros a partir de uma hora específica.

    Depois de processar todos os registros existentes, a função é capturada e continua a processar novos registros.

  • On-failure destination (Destino em caso de falha) – uma fila do SQS ou um tópico do SNS para registros que não possam ser processados. Quando o Lambda descarta um lote de registros porque é muito antigo ou esgotou todas as novas tentativas, ele envia detalhes sobre o lote para a fila ou o tópico.

  • Retry attempts (Novas tentativas) – o número máximo de vezes que o Lambda faz novas tentativas quando a função retorna um erro. Isso não se aplica a erros ou limitações do serviço em que o lote não atingiu a função.

  • Maximum age of record (Idade máxima do registro) – a idade máxima de um registro que o Lambda envia para sua função.

  • Split batch on error (Dividir o lote em caso de erro) – quando a função retorna um erro, divide o lote em dois antes de tentar novamente.

  • Concurrent batches per shard (Lotes simultâneos por estilhaço) – processa vários lotes do mesmo estilhaço simultaneamente.

  • Habilitado – Defina como verdadeiro para habilitar o mapeamento da origem do evento. Defina como falso para interromper o processamento de registros. O Lambda monitora o último registro processado e retoma o processamento a partir desse ponto quando habilitado novamente.

nota

O Kinesis cobra por cada fragmento e, para distribuição avançada, dados lidos do fluxo. Para obter detalhes de preço, consulte Definição de preço do Amazon Kinesis.

Para gerenciar a configuração da fonte do evento posteriormente, escolha o gatilho no designer.

API do mapeamento da fonte de eventos

Para gerenciar mapeamentos de origens de eventos com a AWS CLI ou o AWS SDK, use as seguintes ações de API:

Para criar o mapeamento de origem de eventos com a AWS CLI, use o comando create-event-source-mapping. O exemplo a seguir usa a AWS CLI para mapear uma função chamada my-function para um streaming de dados do Kinesis. O streaming de dados é especificado por um nome de recurso da Amazon (ARN), com um tamanho de lote de 500, a partir do time stamp no horário do Unix.

$ aws lambda create-event-source-mapping --function-name my-function \ --batch-size 500 --starting-position AT_TIMESTAMP --starting-position-timestamp 1541139109 \ --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream { "UUID": "2b733gdc-8ac3-cdf5-af3a-1827b3b11284", "BatchSize": 500, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1541139209.351, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

Para usar um consumidor, especifique o ARN do consumidor em vez do ARN do fluxo.

Configure opções adicionais para personalizar como os lotes são processados e para especificar quando descartar os registros que não podem ser processados. O exemplo a seguir atualiza um mapeamento de origem de evento para enviar um registro de falha para uma fila do SQS depois de duas novas tentativas ou se os registros tiverem mais uma hora.

$ aws lambda update-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --maximum-retry-attempts 2 --maximum-record-age-in-seconds 3600 --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-2:123456789012:dlq"}}' { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573243620.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Updating", "StateTransitionReason": "User action", "DestinationConfig": {}, "MaximumRecordAgeInSeconds": 604800, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 10000 }

As configurações atualizadas são aplicadas de forma assíncrona e não são refletidas na saída até que o processo seja concluído. Use o comando get-event-source-mapping para ver o status atual.

$ aws lambda get-event-source-mapping --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b { "UUID": "f89f8514-cdd9-4602-9e1f-01a5b77d449b", "BatchSize": 100, "MaximumBatchingWindowInSeconds": 0, "ParallelizationFactor": 1, "EventSourceArn": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream", "FunctionArn": "arn:aws:lambda:us-east-2:123456789012:function:my-function", "LastModified": 1573244760.0, "LastProcessingResult": "PROBLEM: Function call failed", "State": "Enabled", "StateTransitionReason": "User action", "DestinationConfig": { "OnFailure": { "Destination": "arn:aws:sqs:us-east-2:123456789012:dlq" } }, "MaximumRecordAgeInSeconds": 3600, "BisectBatchOnFunctionError": false, "MaximumRetryAttempts": 2 }

Para processar vários lotes simultaneamente, use a opção --parallelization-factor.

$ aws lambda update-event-source-mapping --uuid 2b733gdc-8ac3-cdf5-af3a-1827b3b11284 \ --parallelization-factor 5

Tratamento de erros

O mapeamento de origem de evento que lê registros do fluxo do Kinesis invoca sua função de forma síncrona e tenta novamente em caso de erros. Se a função for limitada ou o serviço Lambda retornar um erro sem invocar a função, o Lambda tentará novamente até os registros expirarem ou excederem a idade máxima que você configurar no mapeamento de origem de evento.

Se a função receber os registros, mas retornar um erro, o Lambda tentará novamente até os registros do lote expirarem, excederem a idade máxima ou atingirem o limite de novas tentativas configurado. Para erros de função, também é possível configurar o mapeamento de origem de evento para dividir um lote com falha em dois lotes. A nova tentativa com lotes menores isola os registros inválidos e contorna problemas de tempo limite. A divisão de um lote não é levada em consideração no limite de novas tentativas.

Se as medidas de tratamento de erros falharem, o Lambda descartará os registros e continuará a processar os lotes do fluxo. Com as configurações padrão, isso significa que um registro inválido pode bloquear o processamento no estilhaço afetado por até one week. Para evitar isso, configure o mapeamento de origem de evento da sua função com um número razoável de novas tentativas e uma idade máxima de registro adequada ao seu caso de uso.

Para manter um registro de lotes descartados, configure um destino de evento com falha. O Lambda envia um documento para a fila ou o tópico de destino com detalhes sobre o lote.

Como configurar um destino para registros de eventos com falha

  1. Abra a página Functions (Funções) do console do Lambda.

  2. Escolha uma função.

  3. Em Designer, escolha Add destination (Adicionar destino).

  4. Em Source (Origem), escolha Stream invocation (Chamada de fluxo).

  5. Para Stream (Fluxo), escolha um fluxo mapeado para a função.

  6. Em Destination type (Tipo de destino), escolha o tipo de recurso que recebe o registro da invocação.

  7. Em Destination (Destino), escolha um recurso.

  8. Escolha Save (Salvar).

O exemplo a seguir mostra um registro de invocação de um stream do Kinesis.

exemplo Registro de invocação

{ "requestContext": { "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5", "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted", "approximateInvokeCount": 1 }, "responseContext": { "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KinesisBatchInfo": { "shardId": "shardId-000000000001", "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722", "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186", "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z", "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z", "batchSize": 500, "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream" } }

Você pode usar essas informações para recuperar os registros afetados do fluxo para solução de problemas. Os registros reais não são incluídos, portanto, você deve processar esse registro e recuperá-los do fluxo antes que eles expirem ou sejam perdidos.

Métricas do Amazon CloudWatch

O Lambda emite a métrica IteratorAge quando a sua função termina de processar um lote de registros. A métrica indica a idade do último registro no lote quando o processamento foi concluído. Se a sua função estiver processando novos eventos, você poderá usar a idade do iterador para estimar a latência entre quando um registro é adicionado e quando a função o processa.

Uma tendência crescente na idade do iterador pode indicar problemas com sua função. Para obter mais informações, consulte Trabalhar com métricas de função do AWS Lambda.