Processar mensagens do Apache Kafka autogerenciado com o Lambda
nota
Se você deseja enviar dados para um destino que não seja uma função do Lambda ou enriquecer os dados antes de enviá-los, consulte Amazon EventBridge Pipes (Pipes do Amazon EventBridge).
Tópicos
Adicionar um cluster do Kafka como uma origem de evento
Para criar um mapeamento de origem de evento, adicione seu cluster do Kafka como um acionador de função do Lambda usando o console do Lambda, um AWS SDK
Esta seção descreve como criar um mapeamento de origens de eventos do usando o console do Lambda e aAWS CLI.
Pré-requisitos
-
Um cluster Apache Kafka autogerenciado. O Lambda é compatível com o Apache Kafka versão 0.10.1.0 e posteriores.
-
Uma função de execução com permissão para acessar os recursos da AWS que o cluster do Kafka autogerenciado usa.
ID de grupo de consumidores personalizável
Ao configurar o Kafka como uma origem de eventos, você pode especificar um ID de grupo de consumidores. Esse ID de grupo de consumidores é um identificador existente para o grupo de consumidores do Kafka no qual você deseja que a função do Lambda ingresse. Você pode usar esse recurso para migrar facilmente qualquer configuração de processamento em andamento de registros do Kafka de outros consumidores para o Lambda.
Se você especificar um ID de grupo de consumidores e houver outros pesquisadores ativos dentro desse grupo de consumidores, o Kafka distribuirá mensagens entre todos os consumidores. Em outras palavras, o Lambda não receberá todas as mensagens para o tópico do Kafka. Se você quiser que o Lambda gerencie todas as mensagens do tópico, desative todos os outros pesquisadores desse grupo de consumidores.
Além disso, se você especificar um ID de grupo de consumidores e o Kafka encontrar um grupo de consumidores válido já existente com o mesmo ID, o Lambda ignorará o parâmetro StartingPosition
no mapeamento de origem de eventos. Em vez disso, o Lambda começará a processar registros de acordo com o deslocamento confirmado do grupo de consumidores. Se você especificar um ID de grupo de consumidores e o Kafka não conseguir encontrar um grupo de consumidores existente, o Lambda configurará a origem de eventos com a StartingPosition
especificada.
O ID do grupo de consumidores que você especificar deverá ser exclusivo entre todas as origens de eventos do Kafka. Após criar um mapeamento de origem de eventos do Kafka com o ID do grupo de consumidores especificado, você não poderá atualizar esse valor.
Adicionando um cluster Kafka autogerenciado (console)
Siga estas etapas para adicionar seu cluster autogerenciado do Apache Kafka e um tópico do Kafka como um acionador para sua função do Lambda.
Para adicionar um acionador do Apache Kafka à sua função do Lambda (console)
-
Abra a página Functions
(Funções) no console do Lambda. -
Escolha o nome da função do Lambda.
-
Em Visão geral da função, escolha Adicionar gatilho.
-
Em Trigger configuration (Configuração do acionador), faça o seguinte:
-
Selecione o tipo de acionadro Apache Kafka.
-
Para Servidores de bootstrap, insira o endereço de host e par de portas de um corretor Kafka em seu cluster e escolha Adicionar. Repita para cada agente da Kafka no cluster.
-
para oNome do tópico, insira o nome do tópico do Kafka usado para armazenar registros no cluster.
-
(Opcional) ParaTamanho do lote, insira o número máximo de registros a serem recebidos em um único lote.
-
Em Batch window (Janela de lote), insira o tempo máximo em segundos usado pelo Lambda para coletar os registros antes de invocar a função.
-
(Opcional) em Consumer group ID (ID do grupo de consumidores), insira o ID de um grupo de consumidores do Kafka no qual ingressar.
-
(Opcional) Em Posição inicial, escolha Mais recente para começar a realizar a leitura do fluxo a partir do registro mais recente, Horizonte de corte para começar no registro mais antigo disponível ou No carimbo de data e hora para especificar um carimbo de data e hora para começar a realizar a leitura.
-
(Opcional) Em VPC, escolha a Amazon VPC para seu cluster do Kafka. Em seguida, escolha VPC subnets (Sub-redes da VPC) e VPC security groups (Grupos de segurança da VPC).
Essa configuração será necessária se apenas os usuários da VPC acessarem seus agentes.
-
(Opcional) Em Authentication (Autenticação), escolha Add (Adicionar) e faça o seguinte:
-
Escolha o protocolo de acesso ou a autenticação dos agentes do Kafka no cluster.
-
Se o agente do Kafka usar autenticação SASL/PLAIN, escolha BASIC_AUTH.
-
Se seu agente usar autenticação SASL/SCRAM, escolha um dos protocolos SASL_SCRAM.
-
Se estiver configurando a autenticação mTLS, escolha o protocolo CLIENT_CERTIFICATE_TLS_AUTH.
-
-
Para a autenticação SASL/SCRAM ou mTLS, escolha a chave secreta do Secrets Manager que contém as credenciais de seu cluster do Kafka.
-
-
(Opcional) Em Encryption (Criptografia), escolha o segredo do Secrets Manager que contém o certificado CA raiz que seus agentes do Kafka usam para criptografia TLS, se seus agentes do Kafka usarem certificados assinados por uma CA privada.
Essa configuração se aplica à criptografia TLS para SASL/SCRAM ou SASL/PLAIN e à autenticação MTLs.
-
Para criar o gatilho em um estado desativado para teste (recomendado), desmarque Ativar gatilho. Ou, para habilitar o gatilho imediatamente, selecione Ativar gatilho.
-
-
Para criar o acionador, selecione Add (Adicionar).
Adicionando um cluster Kafka autogerenciado (AWS CLI)
Use os comandos de exemplo da AWS CLI a seguir para criar e visualizar um acionador autogerenciado do Apache Kafka para sua função do Lambda.
Usar SASL/SCRAM
Se usuários do Kafka acessarem seus agentes do Kafka pela Internet, especifique o segredo do Secrets Manager criado para autenticação SASL/SCRAM. O exemplo a seguir usa o comando create-event-source-mapping
da AWS CLI para mapear uma função do Lambda chamada my-kafka-function
em um tópico do Kafka chamado AWSKafkaTopic
.
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333
:secret:MyBrokerSecretName
\ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Usar uma VPC
Se apenas os usuários do Kafka em sua VPC acessarem seus agentes do Kafka, você deverá especificar sua VPC, suas sub-redes e seu grupo de segurança da VPC. O exemplo a seguir usa o comando create-event-source-mapping
da AWS CLI para mapear uma função do Lambda chamada my-kafka-function
em um tópico do Kafka chamado AWSKafkaTopic
.
aws lambda create-event-source-mapping \ --topics
AWSKafkaTopic
\ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333
:function:my-kafka-function
\ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092
", "abc2.xyz.com:9092
"]}}'
Visualizar o status usando a AWS CLI
O exemplo a seguir usa o comando get-event-source-mapping
da AWS CLI para descrever o status do mapeamento de fontes de eventos que você criou.
aws lambda get-event-source-mapping --uuid
dh38738e-992b-343a-1077-3478934hjkfd7
Parâmetros de configuração autogerenciados do Apache Kafka
Todos os tipos de origem de evento Lambda compartilham o mesmoCreateEventSourceMappingeUpdateEventSourceMappingOperações da API. No entanto, apenas alguns dos parâmetros se aplicam ao Apache Kafka.
Parâmetros da fonte de eventos que se aplicam ao Apache Kafka autogerenciado | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Parâmetro | Obrigatório | Padrão | Observações | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BatchSize |
N |
100 |
Máximo: 10.000. |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Habilitado |
N |
Habilitado |
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FunctionName |
Y |
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FilterCriteria |
N |
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumBatchingWindowInSeconds |
N |
500 ms |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SelfManageDeventSource |
Y |
Lista de corretores Kafka. Pode definir apenas em Criar |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SelfManagedKafkaEventSourceConfig |
N |
Contém o campo ConsumerGroupId que assume por padrão um valor exclusivo. |
Pode definir apenas em Criar |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SourceAccessConfigurations |
N |
Nenhuma credencial |
Informações da VPC ou credenciais de autenticação para o cluster Para SASL_PLAIN, defina como BASIC_AUTH |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StartingPosition |
Y |
|
AT_TIMESTAMP, TRIM_HORIZON, ou LATEST Pode definir apenas em Criar |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
StartingPositionTimestamp |
N |
|
Obrigatório se StartingPosition estiver definido como AT_TIMESTAMP |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
Tópicos |
Y |
|
Nome do tópico Pode definir apenas em Criar |
Usar um cluster do Kafka como uma fonte de eventos
Quando você adiciona seu cluster Apache Kafka como um gatilho para sua função do Lambda, o cluster é usado como umOrigem do evento.
O Lambda lê os dados de eventos dos tópicos do Kafka que você especifica como Topics
em uma solicitação CreateEventSourceMapping com base na StartingPosition
especificada. Após o processamento bem-sucedido, seu tópico do Kafka é confirmado no cluster do Kafka.
Se você especificar StartingPosition
como LATEST
, o Lambda começará a ler da mensagem mais recente em cada partição pertencente ao tópico. Como pode haver algum atraso após a configuração do acionador antes de o Lambda começar a ler as mensagens, o Lambda não lerá nenhuma mensagem produzida durante a janela.
O Lambda processa registros de uma ou mais partições de tópico do Kafka especificadas e envia uma carga útil JSON à função. Quando mais registros estiverem disponíveis, o Lambda continuará processando-os em lotes, com base no valor de BatchSize
especificado na solicitação CreateEventSourceMapping, até que a função atinja o tópico.
Se sua função retorna um erro para qualquer uma das mensagens em um lote, o Lambda tenta novamente todo o lote de mensagens até que o processamento seja bem-sucedido ou as mensagens expiram. É possível enviar registros que apresentaram falha em todas as tentativas a um destino em caso de falha para processamento posterior.
nota
Embora as funções do Lambda normalmente tenham um limite máximo de tempo de 15 minutos, os mapeamentos da origem dos eventos para o Amazon MSK, o Apache Kafka autogerenciado, o Amazon DocumentDB e o Amazon MQ para ActiveMQ e RabbitMQ são compatíveis somente com funções com limites máximos de tempo limite de 14 minutos. Essa restrição garante que o mapeamento da origem do evento possa solucionar adequadamente os erros de função e repetições.
Posições iniciais de sondagem e fluxo
Esteja ciente de que a sondagem do fluxo durante a criação e as atualizações do mapeamento da origem do evento é, finalmente, consistente.
-
Durante a criação do mapeamento da origem do evento, pode levar alguns minutos para a sondagem de eventos do fluxo iniciar.
-
Durante as atualizações do mapeamento da origem do evento, pode levar alguns minutos para interromper e reiniciar a sondagem de eventos do fluxo.
Esse comportamento significa que, se você especificar LATEST
como posição inicial do fluxo, o mapeamento da origem do evento poderá perder eventos durante a criação ou as atualizações. Para garantir que nenhum evento seja perdido, especifique a posição inicial do fluxo como TRIM_HORIZON
ou AT_TIMESTAMP
.
Dimensionamento automático da origem do evento Kafka
Quando você inicialmente cria uma fonte de eventos do Apache Kafka, o Lambda aloca um consumidor para processar todas as partições no tópico do Kafka. Cada consumidor conta com vários processadores em execução em paralelo para lidar com um aumento de workloads. O Lambda aumenta ou reduz a escala na vertical automaticamente do número de consumidores com base na workload. Para preservar a ordenação de mensagens em cada partição, o número máximo de consumidores é um consumidor por partição no tópico.
A cada um minuto, o Lambda avalia o atraso de compensação do consumidor de todas as partições do tópico. Se o atraso for muito alto, a partição está recebendo mensagens mais rápido do que o Lambda pode processá-las. Se necessário, o Lambda adiciona ou remove os consumidores do tópico. O processo de escalabilidade de adicionar ou remover consumidores ocorre em até três minutos após a avaliação.
Se sua função alvo do Lambda estiver sobrecarregada, o Lambda reduz o número de consumidores. Essa ação reduz a workload na função, reduzindo o número de mensagens que os consumidores podem recuperar e enviar para a função.
Para monitorar o throughput do tópico do Kafka, você pode visualizar as métricas de consumo do Apache Kafka, como consumer_lag
econsumer_offset
. Para verificar quantas invocações de função ocorrem em paralelo, você também pode monitorar oMétricas de simultaneidadePara a função do.
Métricas do Amazon CloudWatch
O Lambda emite a métrica OffsetLag
enquanto sua função processa registros. O valor dessa métrica é a diferença de deslocamento entre o último registro gravado no tópico da origem de eventos do Kafka e o último registro que o grupo de consumidores da função processou. Você pode usar OffsetLag
para estimar a latência entre o momento em que um registro é adicionado e o momento em que o grupo de consumidores o processa.
Uma tendência crescente em OffsetLag
pode indicar problemas com pesquisadores no grupo de consumidores da função. Para ter mais informações, consulte Trabalhar com métricas de funções Lambda.