Usar o Lambda com o Apache Kafka autogerenciado - AWS Lambda

Usar o Lambda com o Apache Kafka autogerenciado

nota

Se você deseja enviar dados para um destino que não seja uma função do Lambda ou enriquecer os dados antes de enviá-los, consulte Amazon EventBridge Pipes (Pipes do Amazon EventBridge).

O Lambda suportaApache Kafkacomo umOrigem do evento. O Apache Kafka é uma plataforma de streaming de eventos de código aberto que suporta cargas de trabalho, como pipelines de dados e análises de streaming.

Você pode usar o serviço do Kafka gerenciado pela AWS Amazon Managed Streaming for Apache Kafka (Amazon MSK) ou um cluster do Kafka autogerenciado. Para obter detalhes sobre o uso do Lambda com o Amazon MSK, consulte Usar o Lambda com o Amazon MSK.

Este tópico descreve como usar o Lambda com um cluster Kafka autogerenciado. Na terminologia da AWS, um cluster autogerenciado inclui clusters do Kafka não hospedados pela AWS. Por exemplo, você pode hospedar seu cluster Kafka com um provedor de nuvem, como Confluent Cloud.

O Apache Kafka como uma fonte de eventos opera de forma semelhante ao uso do Amazon Simple Queue Service (Amazon SQS) ou do Amazon Kinesis. O Lambda pesquisa internamente por novas mensagens da origem do evento e, em seguida, chama de forma síncrona a função do Lambda de destino. O Lambda lê as mensagens em lotes e fornece estas para a sua função como uma carga de eventos. O tamanho máximo do lote é configurável. (O valor padrão é de 100 mensagens.)

Atenção

Os mapeamentos da origem do evento do Lambda processam cada evento ao menos uma vez, podendo haver o processamento duplicado de registros. Para evitar possíveis problemas relacionados a eventos duplicados, é altamente recomendável tornar o código da função idempotente. Para saber mais, consulte Como tornar minha função do Lambda idempotente no Centro de Conhecimentos da AWS.

Para fontes de eventos baseadas em Kafka, o Lambda oferece suporte a parâmetros de controle de processamento, como janelas de lotes e tamanho do lote. Para ter mais informações, consulte Comportamento de lotes.

Para obter um exemplo de como usar o Kafka autogerenciado como uma fonte de eventos, consulteUsar o Apache Kafka auto-hospedado como uma fonte de eventos para o AWS Lambda no blog AWSCompute.

Evento de exemplo

O Lambda envia o lote de mensagens no parâmetro de evento quando ele invoca sua função do Lambda. O payload do evento contém uma matriz de mensagens. Cada item da matriz contém detalhes do tópico do Kafka e do identificador de partição do Kafka, juntamente com um carimbo de data/hora e uma mensagem codificada em base64.

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Autenticação de clusters do Kafka

O Lambda suporta vários métodos para autenticar com seu cluster Apache Kafka autogerenciado. Configure o cluster Kafka para utilizar um dos métodos de autenticação compatíveis. Para obter mais informações sobre a segurança do Kafka, consulte a seção Segurança da documentação do Kafka.

Acesso por VPC

Se apenas os usuários do Kafka de sua VPC acessarem seus agentes do Kafka, será necessário configurar a fonte de eventos com o acesso da Amazon Virtual Private Cloud (Amazon VPC).

Autenticação SASL/SCRAM

O Lambda oferece suporte à autenticação Simple Authentication e Security Layer/Salted Challenge Response Authentication Mechanism (SASL/SCRAM) com criptografia (SASL_SSL) Transport Layer Security (TLS). O Lambda envia as credenciais criptografadas para autenticar com o cluster. O Lambda não oferece suporte a SASL/SCRAM com texto simples (SASL_PLAINTEXT). Para obter mais informações sobre a autenticação do SASL/SCRAM, consulte RFC 5802.

O Lambda também oferece suporte à autenticação SASL/PLAIN. Como esse mecanismo usa credenciais de texto não criptografado, a conexão com o servidor deve usar criptografia TLS para garantir que as credenciais sejam protegidas.

Para a autenticação SASL, armazene as credenciais de login como um segredo no AWS Secrets Manager. Para obter mais informações sobre o uso do Secrets Manager, consulte Tutorial: Create and retrieve a secret (Criar e recuperar um segredo) no Guia do usuário do AWS Secrets Manager.

Importante

Para usar o Secrets Manager para autenticação, os segredos devem estar armazenados na mesma região da AWS que sua função do Lambda.

Autenticação TLS mútua

O TLS mútuo (mTLS) fornece autenticação bidirecional entre o cliente e o servidor. O cliente envia um certificado ao servidor para que o servidor verifique o cliente, e o servidor envia um certificado ao cliente para que o cliente verifique o servidor.

No Apache Kafka autogerenciado, o Lambda atua como cliente. Você configura um certificado de cliente (como um segredo no Secrets Manager) para autenticar o Lambda com os agentes do Kafka. O certificado do cliente deve ser assinado por uma autoridade de certificação no armazenamento de confiança do servidor.

O cluster do Kafka envia um certificado de servidor ao Lambda para autenticar os agentes do Kafka com o Lambda. O certificado do servidor pode ser um certificado CA público ou um certificado de CA privado/autoassinado. O certificado CA público deve ser assinado por uma autoridade de certificação (CA) que esteja no repositório de confiança do Lambda. Para um certificado CA privado/autoassinado, configure o certificado CA raiz do servidor (como um segredo no Secrets Manager). O Lambda usa o certificado raiz para verificar os agentes do Kafka.

Para obter mais informações sobre o mTLS, consulte Introducing mutual TLS authentication for Amazon MSK as an event source (Introdução à autenticação TLS mútua para o Amazon MSK como fonte de eventos).

Configurar o segredo do certificado do cliente

O segredo CLIENT_CERTIFICATE_TLS_AUTH requer um campo de certificado e um campo de chave privada. Para uma chave privada criptografada, o segredo requer uma senha de chave privada. Tanto o certificado como a chave privada devem estar no formato PEM.

nota

O Lambda oferece suporte a algoritmos de criptografia de chave privada PBES1 (mas não PBES2).

O campo certificate (certificado) deve conter uma lista de certificados, começando pelo certificado do cliente, seguido por quaisquer certificados intermediários e terminando com o certificado raiz. Cada certificado deve iniciar em uma nova linha com a seguinte estrutura:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

O Secrets Manager oferece suporte a segredos de até 65.536 bytes, que é espaço suficiente para cadeias de certificados longas.

A chave privada deve estar no formato PKCS #8, com a seguinte estrutura:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Para uma chave privada criptografada, use a seguinte estrutura:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

O exemplo a seguir exibe o conteúdo de um segredo para autenticação mTLS usando uma chave privada criptografada. Para uma chave privada criptografada, inclua a senha de chave privada no segredo.

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Configurar o segredo do certificado CA raiz do servidor

Crie esse segredo se seus agentes do Kafka usarem criptografia TLS com certificados assinados por uma CA privada. É possível usar criptografia TLS para autenticação de VPC, SASL/SCRAM, SASL/PLAIN ou mTLS.

O segredo do certificado CA raiz do servidor requer um campo que contenha o certificado CA raiz do agente do Kafka no formato PEM. O exemplo a seguir exibe a estrutura do segredo.

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

Gerenciar acesso e permissões de API

Além de acessar seu cluster do Kafka autogerenciado, a função Lambda necessita de permissões para executar várias ações da API. Adicione essas permissões à função de execução da função. Se os usuários precisarem acessar alguma ação da API, adicione as permissões necessárias à política de identidade para o usuário ou a função do AWS Identity and Access Management (IAM).

Permissões de função do Lambda necessárias

Para criar e armazenar logs em um grupo de logs do Amazon CloudWatch Logs, sua função Lambda deve ter as seguintes permissões na função de execução:

Permissões de função do Lambda opcionais

Sua função Lambda também pode precisar dessas permissões para:

  • Descreva o segredo do Secrets Manager.

  • Acessar a chave gerenciada pelo cliente AWS Key Management Service (AWS KMS)

  • Acesse sua Amazon VPC.

  • Enviar registros de invocações com falha para um destino

Secrets Manager e permissões do AWS KMS

Conforme o tipo de controle de acesso que você está configurando para seus agentes do Kafka, sua função Lambda poderá precisar de permissão para acessar seu segredo do Secrets Manager ou descriptografar sua chave do AWS KMS gerenciada pelo cliente. Para acessar esses recursos, a função de execução da função precisa ter as seguintes permissões:

Permissões da VPC

Se somente usuários dentro de uma VPC puderem acessar seu cluster do Apache Kafka autogerenciado, a função Lambda deverá ter permissão para acessar seus recursos da Amazon VPC. Esses recursos incluem sua VPC, sub-redes, security groups e interfaces de rede. Para acessar esses recursos, a função de execução da função precisa ter as seguintes permissões:

Adicionar permissões à sua função de execução

Para acessar outros serviços da AWS que seu cluster autogerenciado do Apache Kafka usa, o Lambda utiliza as políticas de permissão que você define na função de execução da função do Lambda.

Por padrão, o Lambda não pode executar as ações obrigatórias ou opcionais para um cluster do Apache Kafka autogerenciado. Você deve criar e definir essas ações em uma política de confiança do IAM e, em seguida, associar a política à sua função de execução. Este exemplo mostra como criar uma política que permita que o Lambda acesse seus recursos da Amazon VPC.

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

Para obter informações sobre como criar um documento de política JSON no console do IAM, consulte Criar políticas na guia JSON no Guia do usuário do IAM.

Conceder acesso aos usuários com uma política do IAM

Por padrão, usuários e perfis não têm permissão para executar operações de API de origem de eventos. Para conceder acesso a usuários de sua organização ou conta, crie ou atualize uma política baseada em identidades. Para obter mais informações, consulte Controlar o acesso aos recursos da AWS usando políticas no Manual do usuário do IAM.

Erros de autenticação e autorização

Se alguma das permissões necessárias para consumir dados do cluster do Kafka estiver ausente, o Lambda exibirá uma das mensagens a seguir de erro no mapeamento da fonte de eventos em LastProcessingResult.

O cluster falhou ao autorizar o Lambda

Para SASL/SCRAM ou mTLS, esse erro indica que o usuário fornecido não tem todas estas permissões da lista de controle de acesso (ACL) do Kafka necessárias:

  • Cluster DescribeConfigs

  • Descrever grupo

  • Ler grupo

  • Descrever tópico

  • Ler tópico

Ao criar ACLs do Kafka com as permissões necessárias do kafka-cluster, especifique o tópico e o grupo como recursos. O nome do tópico deve corresponder ao tópico no mapeamento da fonte de eventos. O nome do grupo deve corresponder ao UUID do mapeamento da fonte de eventos.

Depois de adicionar as permissões necessárias à função de execução, poderá levar vários minutos para que as alterações entrem em vigor.

SASL authentication failed (Falha na autenticação SASL)

Em SASL/SCRAM ou SASL/PLAIN, esse erro indica que as credenciais de login fornecidas não são válidas.

Server failed to authenticate Lambda (Falha ao autenticar o Lambda no servidor)

Esse erro indica que o agente do Kafka não conseguiu autenticar o Lambda. Esse erro pode ocorrer por estes motivos:

  • Você não forneceu um certificado de cliente para autenticação mTLS.

  • Você forneceu um certificado de cliente, mas os agentes do Kafka não estão configurados para usar a autenticação mTLS.

  • Um certificado de cliente não é confiável para os agentes do Kafka.

Lambda failed to authenticate server (Falha ao autenticar o servidor no Lambda)

Esse erro indica que o Lambda não conseguiu autenticar o agente do Kafka. Esse erro pode ocorrer por estes motivos:

  • Os agentes do Kafka usam certificados autoassinados ou uma CA privada, mas não fornecem o certificado CA raiz do servidor.

  • O certificado CA raiz do servidor não corresponde à CA raiz que assinou o certificado do agente.

  • A validação do nome de host falhou porque o certificado do agente não contém o nome DNS ou o endereço IP do agente como nome alternativo do assunto.

Provided certificate or private key is invalid (O certificado ou a chave privada fornecida é inválida)

Esse erro indica que o consumidor do Kafka não pôde usar o certificado fornecido ou a chave privada. Verifique se o certificado e a chave usam o formato PEM e que a criptografia de chave privada usa um algoritmo PBES1.

Configuração de rede

Para que o Lambda use seu cluster do Kafka como uma origem de eventos, ele precisa de acesso à Amazon VPC na qual o cluster reside. Recomendamos implantar endpoints da VPC do AWS PrivateLink para o Lambda acessar sua VPC. Implante endpoints para Lambda e AWS Security Token Service (AWS STS). Se o agente usar autenticação, implante também um endpoint da VPC para o Secrets Manager. Se você tiver configurado um destino em caso de falha, implante também um endpoint da VPC para o serviço de destino.

Alternativamente, certifique-se de que a VPC associada ao cluster do Kafka inclua um gateway NAT por sub-rede pública. Para ter mais informações, consulte Habilitar o acesso à Internet para funções do Lambda conectadas à VPC.

Se você usa endpoints da VPC, também deve configurá-los para habilitar nomes DNS privados.

Quando você cria um mapeamento da origem do evento para um cluster autogerenciado do Apache Kafka, o Lambda verifica se as interfaces de rede elástica (ENIs) já estão presentes nas sub-redes e nos grupos de segurança da VPC do seu cluster. Se o Lambda encontrar ENIs existentes, ele tentará reutilizá-las. Caso contrário, o Lambda criará novas ENIs para se conectar à origem do evento e invocar sua função.

nota

As funções do Lambda sempre são executadas em VPCs de propriedade do serviço Lambda. Essas VPCs recebem manutenção automática do serviço e não são visíveis para os clientes. Você também pode conectar sua função a uma Amazon VPC. Em ambos os casos, a configuração de VPC da sua função não afetará o mapeamento da origem do evento. Somente a configuração da VPC da origem de eventos determina o modo de conexão do Lambda à sua origem de eventos.

Para obter mais informações sobre como configurar a rede, consulteConfigurar o AWS Lambda com um cluster do Apache Kafka em uma VPC no blog AWS Compute.

Regras de grupos de segurança da VPC

Configure com as seguintes regras (no mínimo) os grupos de segurança da Amazon VPC que contêm seu cluster:

  • Regras de entrada: permitir todo o tráfego na porta do agente do Kafka para os grupos de segurança especificados para sua fonte de eventos. O Kafka usa a porta 9092 por padrão.

  • Regras de saída: permitir todo o tráfego na porta 443 para todos os destinos. Permita todo o tráfego na porta do agente do Kafka para os grupos de segurança especificados para sua fonte de eventos. O Kafka usa a porta 9092 por padrão.

  • Se você estiver usando endpoints da VPC em vez de um gateway NAT, os grupos de segurança associados aos endpoints da VPC deverão permitir todo o tráfego de entrada na porta 443 dos grupos de segurança da fonte de eventos.

Trabalhar com endpoints da VPC

Quando você usa endpoints da VPC, as chamadas de API para invocar sua função são roteadas por esses endpoints usando as ENIs. A entidade principal do serviço Lambda precisa chamar sts:AssumeRole e lambda:InvokeFunction para quaisquer perfis e funções que usem essas ENIs.

Por padrão, os endpoints da VPC têm políticas do IAM que são abertas. A prática recomendada é restringir essas políticas a fim de permitir que somente entidades principais específicas executem as ações necessárias usando esse endpoint. Para garantir que seu mapeamento da origem do evento seja capaz de invocar sua função do Lambda, a política de endpoint da VPC deve permitir que a entidade principal do serviço Lambda chame sts:AssumeRole e lambda:InvokeFunction. A restrição de suas políticas de endpoint da VPC para permitir apenas chamadas de API originadas em sua organização impedirá o funcionamento adequado do mapeamento da origem do evento.

O seguinte exemplo de políticas de endpoint da VPC mostra como conceder o acesso necessário à entidade principal do serviço Lambda para os endpoints do AWS STS e Lambda.

exemplo Política de endpoint da VPC: endpoint do AWS STS
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
exemplo Política de endpoint da VPC: endpoint do Lambda
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Se o seu agente do Kafka usar autenticação, você também poderá restringir a política de endpoint da VPC para o endpoint do Secrets Manager. Para chamar a API do Secrets Manager, o Lambda usa seu perfil de função, e não a entidade principal de serviço do Lambda. O exemplo a seguir mostra uma política de endpoint do Secrets Manager.

exemplo Política de endpoint da VPC: endpoint do Secrets Manager.
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

Se você tiver um destino em caso de falha configurado, o Lambda também usará o perfil de sua função para chamar s3:PutObject, sns:Publish ou sqs:sendMessage usando as ENIs gerenciadas pelo Lambda.

Adicionar um cluster do Kafka como uma origem de evento

Para criar um mapeamento de origem de evento, adicione seu cluster do Kafka como um acionador de função do Lambda usando o console do Lambda, um AWS SDK ou a AWS Command Line Interface (AWS CLI).

Esta seção descreve como criar um mapeamento de origens de eventos do usando o console do Lambda e aAWS CLI.

Pré-requisitos

  • Um cluster Apache Kafka autogerenciado. O Lambda é compatível com o Apache Kafka versão 0.10.1.0 e posteriores.

  • Uma função de execução com permissão para acessar os recursos da AWS que o cluster do Kafka autogerenciado usa.

ID de grupo de consumidores personalizável

Ao configurar o Kafka como uma origem de eventos, você pode especificar um ID de grupo de consumidores. Esse ID de grupo de consumidores é um identificador existente para o grupo de consumidores do Kafka no qual você deseja que a função do Lambda ingresse. Você pode usar esse recurso para migrar facilmente qualquer configuração de processamento em andamento de registros do Kafka de outros consumidores para o Lambda.

Se você especificar um ID de grupo de consumidores e houver outros pesquisadores ativos dentro desse grupo de consumidores, o Kafka distribuirá mensagens entre todos os consumidores. Em outras palavras, o Lambda não receberá todas as mensagens para o tópico do Kafka. Se você quiser que o Lambda gerencie todas as mensagens do tópico, desative todos os outros pesquisadores desse grupo de consumidores.

Além disso, se você especificar um ID de grupo de consumidores e o Kafka encontrar um grupo de consumidores válido já existente com o mesmo ID, o Lambda ignorará o parâmetro StartingPosition no mapeamento de origem de eventos. Em vez disso, o Lambda começará a processar registros de acordo com o deslocamento confirmado do grupo de consumidores. Se você especificar um ID de grupo de consumidores e o Kafka não conseguir encontrar um grupo de consumidores existente, o Lambda configurará a origem de eventos com a StartingPosition especificada.

O ID do grupo de consumidores que você especificar deverá ser exclusivo entre todas as origens de eventos do Kafka. Após criar um mapeamento de origem de eventos do Kafka com o ID do grupo de consumidores especificado, você não poderá atualizar esse valor.

Adicionando um cluster Kafka autogerenciado (console)

Siga estas etapas para adicionar seu cluster autogerenciado do Apache Kafka e um tópico do Kafka como um acionador para sua função do Lambda.

Para adicionar um acionador do Apache Kafka à sua função do Lambda (console)
  1. Abra a página Functions (Funções) no console do Lambda.

  2. Escolha o nome da função do Lambda.

  3. Em Visão geral da função, escolha Adicionar gatilho.

  4. Em Trigger configuration (Configuração do acionador), faça o seguinte:

    1. Selecione o tipo de acionadro Apache Kafka.

    2. para oServidores de bootstrap, insira o endereço de host e par de portas de um corretor Kafka em seu cluster e escolhaAdicionar. Repita para cada agente da Kafka no cluster.

    3. para oNome do tópico, insira o nome do tópico do Kafka usado para armazenar registros no cluster.

    4. (Opcional) ParaTamanho do lote, insira o número máximo de registros a serem recebidos em um único lote.

    5. Em Batch window (Janela de lote), insira o tempo máximo em segundos usado pelo Lambda para coletar os registros antes de invocar a função.

    6. (Opcional) em Consumer group ID (ID do grupo de consumidores), insira o ID de um grupo de consumidores do Kafka no qual ingressar.

    7. (Opcional) Em Posição inicial, escolha Mais recente para começar a realizar a leitura do fluxo a partir do registro mais recente, Horizonte de corte para começar no registro mais antigo disponível ou No carimbo de data e hora para especificar um carimbo de data e hora para começar a realizar a leitura.

    8. (Opcional) Em VPC, escolha a Amazon VPC para seu cluster do Kafka. Em seguida, escolha VPC subnets (Sub-redes da VPC) e VPC security groups (Grupos de segurança da VPC).

      Essa configuração será necessária se apenas os usuários da VPC acessarem seus agentes.

    9. (Opcional) Em Authentication (Autenticação), escolha Add (Adicionar) e faça o seguinte:

      1. Escolha o protocolo de acesso ou a autenticação dos agentes do Kafka no cluster.

        • Se o agente do Kafka usar autenticação SASL/PLAIN, escolha BASIC_AUTH.

        • Se seu agente usar autenticação SASL/SCRAM, escolha um dos protocolos SASL_SCRAM.

        • Se estiver configurando a autenticação mTLS, escolha o protocolo CLIENT_CERTIFICATE_TLS_AUTH.

      2. Para a autenticação SASL/SCRAM ou mTLS, escolha a chave secreta do Secrets Manager que contém as credenciais de seu cluster do Kafka.

    10. (Opcional) Em Encryption (Criptografia), escolha o segredo do Secrets Manager que contém o certificado CA raiz que seus agentes do Kafka usam para criptografia TLS, se seus agentes do Kafka usarem certificados assinados por uma CA privada.

      Essa configuração se aplica à criptografia TLS para SASL/SCRAM ou SASL/PLAIN e à autenticação MTLs.

    11. Para criar o gatilho em um estado desativado para teste (recomendado), desmarqueAtivar gatilho. Ou, para habilitar o gatilho imediatamente, selecioneAtivar gatilho.

  5. Para criar o acionador, selecione Add (Adicionar).

Adicionando um cluster Kafka autogerenciado (AWS CLI)

Use os comandos de exemplo da AWS CLI a seguir para criar e visualizar um acionador autogerenciado do Apache Kafka para sua função do Lambda.

Usar SASL/SCRAM

Se usuários do Kafka acessarem seus agentes do Kafka pela Internet, especifique o segredo do Secrets Manager criado para autenticação SASL/SCRAM. O exemplo a seguir usa o comando create-event-source-mapping da AWS CLI para mapear uma função do Lambda chamada my-kafka-function em um tópico do Kafka chamado AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Usar uma VPC

Se apenas os usuários do Kafka em sua VPC acessarem seus agentes do Kafka, você deverá especificar sua VPC, suas sub-redes e seu grupo de segurança da VPC. O exemplo a seguir usa o comando create-event-source-mapping da AWS CLI para mapear uma função do Lambda chamada my-kafka-function em um tópico do Kafka chamado AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Visualizar o status usando a AWS CLI

O exemplo a seguir usa o comando get-event-source-mapping da AWS CLI para descrever o status do mapeamento de fontes de eventos que você criou.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Destinos em caso de falha

Para reter registros de invocações de mapeamento da origem do evento com falha, adicione um destino ao mapeamento da origem de eventos da função. Cada registro enviado ao destino é um documento JSON com metadados sobre a invocação que falhou. É possível configurar qualquer tópico do Amazon SNS, fila do Amazon SQS ou bucket do S3 como destino. Sua função de execução deve ter permissões para o destino:

Além disso, se você configurou uma chave do KMS no seu destino, o Lambda precisará das seguintes permissões, dependendo do tipo de destino:

  • Se você habilitou a criptografia com sua própria chave do KMS para um destino do S3, kms:GenerateDataKey é necessário. Se a chave do KMS e o destino do bucket do S3 estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:GenerateDataKey.

  • Se você habilitou a criptografia com sua própria chave do KMS para um destino do SQS, kms:Decrypt e kms:GenerateDataKey são necessários. Se a chave do KMS e o destino da fila do SQS estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.

  • Se você habilitou a criptografia com sua própria chave do KMS para um destino do SNS, kms:Decrypt e kms:GenerateDataKey são necessários. Se a chave do KMS e o destino do tópico do SNS estiverem em uma conta diferente da função do Lambda e do perfil de execução, configure a chave do KMS para confiar no perfil de execução para permitir kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey e kms:ReEncrypt.

Para configurar um destino em caso de falha usando o console, siga estas etapas:

  1. Abra a página Funções do console do Lambda.

  2. Escolha uma função.

  3. Em Function overview (Visão geral da função), escolha Add destination (Adicionar destino).

  4. Em Origem, escolha Invocação do mapeamento da origem do evento.

  5. Em Mapeamento da origem do evento, escolha uma origem de eventos configurada para essa função.

  6. Em Condição, selecione Em caso de falha. Para invocações de mapeamento da origem de eventos, essa é a única condição aceita.

  7. Em Tipo de destino, escolha o tipo de destino para o qual o Lambda envia registros de invocação.

  8. Em Destination (Destino), escolha um recurso.

  9. Escolha Salvar.

Você também pode configurar um destino em caso de falha usando a AWS CLI. Por exemplo, o seguinte comando create-event-source-mapping adiciona um mapeamento de origem de eventos com um destino SQS em caso de falha a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

O seguinte comando update-event-source-mapping adiciona um destino S3 em caso de falha à origem de eventos associada à entrada uuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Para remover um destino, forneça uma string vazia como argumento para o parâmetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Exemplo de registro de invocação do SNS e do SQS

O exemplo a seguir mostra o que é enviado pelo Lambda para o destino de tópico do SNS ou de fila do SQS em caso de falha na invocação da origem de eventos do Kafka. Cada uma das chaves abaixo de recordsInfo contém o tópico e a partição do Kafka separados por um hífen. Por exemplo, para a chave "Topic-0", Topic é o tópico do Kafka, e 0 é a partição. Para cada tópico e partição, você pode usar os dados de desvios e do carimbo de data/hora para encontrar os registros de invocação originais.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Exemplo de registro de invocação de destino S3

Para destinos do S3, o Lambda envia todo o registro da invocação junto com os metadados para o destino. O exemplo a seguir mostra o que é enviado pelo Lambda para o destino de bucket do S3 em caso de falha na invocação da origem de eventos do Kafka. Além de todos os campos do exemplo anterior para destinos do SQS e do SNS, o campo payload contém o registro de invocação original como uma string JSON com escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
dica

Recomendamos habilitar o versionamento do S3 no bucket de destino.

Usar um cluster do Kafka como uma fonte de eventos

Quando você adiciona seu cluster Apache Kafka como um gatilho para sua função do Lambda, o cluster é usado como umOrigem do evento.

O Lambda lê os dados de eventos dos tópicos do Kafka que você especifica como Topics em uma solicitação CreateEventSourceMapping com base na StartingPosition especificada. Após o processamento bem-sucedido, seu tópico do Kafka é confirmado no cluster do Kafka.

Se você especificar StartingPosition como LATEST, o Lambda começará a ler da mensagem mais recente em cada partição pertencente ao tópico. Como pode haver algum atraso após a configuração do acionador antes de o Lambda começar a ler as mensagens, o Lambda não lerá nenhuma mensagem produzida durante a janela.

O Lambda processa registros de uma ou mais partições de tópico do Kafka especificadas e envia uma carga útil JSON à função. Quando mais registros estiverem disponíveis, o Lambda continuará processando-os em lotes, com base no valor de BatchSize especificado na solicitação CreateEventSourceMapping, até que a função atinja o tópico.

Se sua função retorna um erro para qualquer uma das mensagens em um lote, o Lambda tenta novamente todo o lote de mensagens até que o processamento seja bem-sucedido ou as mensagens expiram. É possível enviar registros que apresentaram falha em todas as tentativas a um destino em caso de falha para processamento posterior.

nota

Embora as funções do Lambda normalmente tenham um limite máximo de tempo de 15 minutos, os mapeamentos da origem dos eventos para o Amazon MSK, o Apache Kafka autogerenciado, o Amazon DocumentDB e o Amazon MQ para ActiveMQ e RabbitMQ são compatíveis somente com funções com limites máximos de tempo limite de 14 minutos. Essa restrição garante que o mapeamento da origem do evento possa solucionar adequadamente os erros de função e repetições.

Posições iniciais de sondagem e fluxo

Esteja ciente de que a sondagem do fluxo durante a criação e as atualizações do mapeamento da origem do evento é, finalmente, consistente.

  • Durante a criação do mapeamento da origem do evento, pode levar alguns minutos para a sondagem de eventos do fluxo iniciar.

  • Durante as atualizações do mapeamento da origem do evento, pode levar alguns minutos para interromper e reiniciar a sondagem de eventos do fluxo.

Esse comportamento significa que, se você especificar LATEST como posição inicial do fluxo, o mapeamento da origem do evento poderá perder eventos durante a criação ou as atualizações. Para garantir que nenhum evento seja perdido, especifique a posição inicial do fluxo como TRIM_HORIZON ou AT_TIMESTAMP.

Dimensionamento automático da origem do evento Kafka

Quando você inicialmente cria uma fonte de eventos do Apache Kafka, o Lambda aloca um consumidor para processar todas as partições no tópico do Kafka. Cada consumidor conta com vários processadores em execução em paralelo para lidar com um aumento de workloads. O Lambda aumenta ou reduz a escala na vertical automaticamente do número de consumidores com base na workload. Para preservar a ordenação de mensagens em cada partição, o número máximo de consumidores é um consumidor por partição no tópico.

A cada um minuto, o Lambda avalia o atraso de compensação do consumidor de todas as partições do tópico. Se o atraso for muito alto, a partição está recebendo mensagens mais rápido do que o Lambda pode processá-las. Se necessário, o Lambda adiciona ou remove os consumidores do tópico. O processo de escalabilidade de adicionar ou remover consumidores ocorre em até três minutos após a avaliação.

Se sua função alvo do Lambda estiver sobrecarregada, o Lambda reduz o número de consumidores. Essa ação reduz a workload na função, reduzindo o número de mensagens que os consumidores podem recuperar e enviar para a função.

Para monitorar o throughput do tópico do Kafka, você pode visualizar as métricas de consumo do Apache Kafka, como consumer_lageconsumer_offset. Para verificar quantas invocações de função ocorrem em paralelo, você também pode monitorar oMétricas de simultaneidadePara a função do.

Erros de mapeamento da fonte de eventos

Quando você adicionar seu cluster do Apache Kafka como uma fonte de eventos para sua função do Lambda, se ela encontrar um erro, o consumidor do Kafka interrompe o processamento de registros. Os consumidores de uma partição de tópico são aqueles que se inscrevem, leem e processam seus registros. Seus outros consumidores do Kafka podem continuar processando registros, desde que não encontrem o mesmo erro.

Para determinar a causa de um consumidor parado, verifique o campo StateTransitionReason na resposta do EventSourceMapping. A lista a seguir descreve os erros de origem do evento que você pode receber:

ESM_CONFIG_NOT_VALID

A configuração do mapeamento da fonte de eventos não é válida.

EVENT_SOURCE_AUTHN_ERROR

O Lambda não conseguiu autenticar a fonte de eventos.

EVENT_SOURCE_AUTHZ_ERROR

O Lambda não tem as permissões necessárias para acessar a fonte de eventos.

FUNCTION_CONFIG_NOT_VALID

A configuração da função não é válida.

nota

Se os registros de eventos do Lambda excederem o limite de tamanho permitido de 6 MB, eles poderão ficar sem processamento.

Métricas do Amazon CloudWatch

O Lambda emite a métrica OffsetLag enquanto sua função processa registros. O valor dessa métrica é a diferença de deslocamento entre o último registro gravado no tópico da origem de eventos do Kafka e o último registro que o grupo de consumidores da função processou. Você pode usar OffsetLag para estimar a latência entre o momento em que um registro é adicionado e o momento em que o grupo de consumidores o processa.

Uma tendência crescente em OffsetLag pode indicar problemas com pesquisadores no grupo de consumidores da função. Para ter mais informações, consulte Trabalhar com métricas de funções Lambda.

Parâmetros de configuração autogerenciados do Apache Kafka

Todos os tipos de origem de evento Lambda compartilham o mesmoCreateEventSourceMappingeUpdateEventSourceMappingOperações da API. No entanto, apenas alguns dos parâmetros se aplicam ao Apache Kafka.

Parâmetros da fonte de eventos que se aplicam ao Apache Kafka autogerenciado
Parâmetro Obrigatório Padrão Observações

BatchSize

N

100

Máximo: 10.000.

Habilitado

N

Habilitado

FunctionName

Y

FilterCriteria

N

Filtragem de eventos do Lambda

MaximumBatchingWindowInSeconds

N

500 ms

Comportamento de lotes

SelfManageDeventSource

Y

Lista de corretores Kafka. Pode definir apenas em Criar

SelfManagedKafkaEventSourceConfig

N

Contém o campo ConsumerGroupId que assume por padrão um valor exclusivo.

Pode definir apenas em Criar

SourceAccessConfigurations

N

Nenhuma credencial

Informações da VPC ou credenciais de autenticação para o cluster

Para SASL_PLAIN, defina como BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON, ou LATEST

Pode definir apenas em Criar

StartingPositionTimestamp

N

Obrigatório se StartingPosition estiver definido como AT_TIMESTAMP

Tópicos

Y

Nome do tópico

Pode definir apenas em Criar