Procesamiento de mensajes de Apache Kafka autoadministrado con Lambda - AWS Lambda

Procesamiento de mensajes de Apache Kafka autoadministrado con Lambda

nota

Si desea enviar datos a un destino que no sea una función de Lambda o enriquecer los datos antes de enviarlos, consulte Amazon EventBridge Pipes (Canalizaciones de Amazon EventBridge).

Agregar un clúster de Kafka como origen de eventos

Para crear una asignación de orígenes de eventos, agregue su clúster de Kafka como un desencadenador de una función de Lambda a través de la consola de Lambda, un AWS SDK, o el AWS Command Line Interface (AWS CLI).

En esta sección se describe cómo crear una asignación de orígenes de eventos mediante la consola de Lambda y la AWS CLI.

Requisitos previos

  • Un clúster de Apache Kafka autoadministrado. Lambda es compatible con Apache Kafka 0.10.1.0 y versiones posteriores.

  • Un rol de ejecución con permiso para acceder a los recursos de AWS que utiliza el clúster de Kafka autoadministrado.

ID del grupo de consumidores personalizable

Al configurar Kafka como origen de eventos, puede especificar un ID de grupo de consumidores. Este ID de grupo de consumidores es un identificador existente para el grupo de consumidores de Kafka al que desea que se una la función de Lambda. Puede utilizar esta característica para migrar sin problemas cualquier configuración de procesamiento de registro de Kafka en curso de otros consumidores a Lambda.

Si especifica un ID de grupo de consumidores y hay otros sondeadores activos dentro de ese grupo de consumidores, Kafka distribuirá los mensajes entre todos los consumidores. En otras palabras, Lambda no recibe todos los mensajes del tema Kafka. Si desea que Lambda gestione todos los mensajes del tema, desactive los demás sondeadores de ese grupo de consumidores.

Además, si especifica un ID de grupo de consumidores y Kafka encuentra un grupo de consumidores existente válido con el mismo ID, Lambda ignora el parámetro StartingPosition para la asignación de orígenes de eventos. En cambio, Lambda comienza a procesar los registros de acuerdo con la compensación comprometida del grupo de consumidores. Si especifica un ID de grupo de consumidores y Kafka no puede encontrar un grupo de consumidores existente, Lambda configura el origen de eventos con el StartingPosition especificado.

El ID del grupo de consumidores que especifique debe ser único entre todos los orígenes de eventos de Kafka. Tras crear una asignación de origen de eventos de Kafka con el ID de grupo de consumidores especificado, no puede actualizar este valor.

Agregar un clúster de Kafka autoadministrado (consola)

Siga estos pasos para agregar su clúster Apache Kafka autoadministrado y un tema Kafka como desencadenador de su función de Lambda.

Para agregar un desencadenador de Apache Kafka a su función de Lambda (consola)
  1. .Abra la página de Functions (Funciones) en la consola de Lambda.

  2. Elija el nombre de su función de Lambda.

  3. En Descripción general de la función, elija Agregar desencadenador.

  4. En Configuración del desencadenador, haga lo siguiente:

    1. Elija el tipo de desencadenador Apache Kafka.

    2. Para los servidores de Bootstrap, ingrese la dirección de host y par de puertos de un broker de Kafka en su clúster y, a continuación, elija Add (Agregar). Repita para cada broker de Kafka en el clúster.

    3. Para el nombre del tema, escriba el nombre del tema Kafka utilizado para almacenar registros en el clúster.

    4. (Opcional) Para Tamaño del lote, introduzca el número máximo de registros que se recibirán en un solo lote.

    5. Para el periodo de lotes, ingrese la cantidad máxima de segundos que Lambda emplea a fin de recopilar registros antes de invocar la función.

    6. (Opcional) Para el ID del grupo de consumidores, ingrese el ID de un grupo de consumidores de Kafka al que unirse.

    7. (Opcional) En Posición inicial, elija Última para empezar a leer el flujo desde el registro más reciente, Horizonte de supresión para comenzar por el registro más antiguo disponible o En la marca de tiempo para especificar una marca de tiempo desde la cual comenzar a leer.

    8. (Opcional) Para VPC, elija Amazon VPC para su clúster de Kafka. A continuación, elija VPC subnets (Subredes de VPC) y VPC security groups (Grupos de seguridad de VPC).

      Esta configuración es obligatoria si solo los usuarios de la VPC acceden a los agentes.

    9. (Opcional) Para Authentication (Autenticación), elija Add (Agregar) y, a continuación, haga lo siguiente:

      1. Elija el protocolo de acceso o autenticación de los agentes de Kafka en su clúster.

        • Si su agente de Kafka utiliza autenticación SASL/PLAIN, elija BASIC_AUTH.

        • Si su agente utiliza autenticación de SASL/SCRAM, elija uno de los protocolos de SASL_SCRAM.

        • Si configura la autenticación de mTLS, elija el protocolo CLIENT_CERTIFICATE_TLS_AUTH.

      2. Para la autenticación de SASL/SCRAM o mTLS, elija la clave secreta de Secrets Manager que contiene las credenciales del clúster de Kafka.

    10. (Opcional) Para Encryption (Cifrado), elija el secreto de Secrets Manager que contiene el certificado de entidad de certificación raíz que los agentes de Kafka utilizan para el cifrado con TLS, si los agentes de Kafka utilizan certificados firmados por una entidad de certificación privada.

      Esta configuración se aplica al cifrado con TLS para SASL/SCRAM o SASL/PLAIN y a la autenticación con mTLS.

    11. Para crear el desencadenador en un estado deshabilitado para la prueba (recomendado), desactive Activar desencadenador. O bien, para habilitar el desencadenador de inmediato, seleccioneActivar desencadenador.

  5. Para crear el desencadenador, elija Add (Añadir).

Agregar un clúster Kafka autoadministrado (AWS CLI)

Utilice los siguientes AWS CLI comandos de ejemplo para crear y ver un desencadenador Apache Kafka autoadminstrado para su función de Lambda.

Uso de SASL/SCRAM

Si los usuarios de Kafka acceden a los agentes de Kafka a través de Internet, especifique el secreto de Secrets Manager que creó para la autenticación con SASL/SCRAM. El siguiente ejemplo utiliza el comando de AWS CLI create-event-source-mapping para asignar una función de Lambda llamada my-kafka-function a un tema de Kafka llamado AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Uso de una VPC

Si solo los usuarios de Kafka de su VPC acceden a sus agentes de Kafka, debe especificar su VPC, subredes y grupo de seguridad de VPC. El siguiente ejemplo utiliza el comando create-event-source-mapping AWS CLI para mapear una función de Lambda llamada my-kafka-function a un tema de Kafka llamado AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Visualización del estado mediante la AWS CLI

El siguiente ejemplo utiliza el comando get-event-source-mapping AWS CLI para describir el estado de la asignación de orígenes de eventos que ha creado.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Parámetros de configuración de Apache Kafka autoadministrado

Todos los tipos de fuente de eventos Lambda comparten las mismas operaciones CreateEventSourceMapping y UpdateEventSourceMapping de la API. Sin embargo, solo algunos de los parámetros se aplican a Apache Kafka.

Parámetros de orígenes de eventos que se aplican a Apache Kafka autoadministrado
Parámetro Obligatoria Predeterminado Notas

BatchSize

N

100

Máximo: 10 000

Habilitado

N

Habilitado

FunctionName

Y

FilterCriteria

N

Filtrado de eventos de Lambda

MaximumBatchingWindowInSeconds

N

500 ms

Comportamiento de procesamiento por lotes

SelfManagedEventSource

Y

Lista de agentes de Kafka. Solo se puede establecer en Crear

SelfManagedKafkaEventSourceConfig

N

Contiene el campo ConsumerGroupId, que se establece de forma predeterminada en un valor único.

Solo se puede establecer en Crear

SourceAccessConfigurations

N

Sin credenciales

Información de VPC o credenciales de autenticación para el clúster

Para SASL_PLAIN, establezca en BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON o LATEST

Solo se puede establecer en Crear

StartingPositionTimestamp

N

Obligatorio si StartingPosition se establece en AT_TIMESTAMP

Temas

Y

Nombre del tema

Solo se puede establecer en Crear

Utilizar un clúster de Kafka como origen de eventos

Cuando agrega su clúster de Apache Kafka como desencadenador para su función de Lambda, el clúster se utiliza como origen de eventos.

Lambda lee los datos de eventos de los temas de Kafka que especifique como Topics en una solicitud de CreateEventSourceMapping, en función de la StartingPosition que especifique. Después de un procesamiento exitoso, su tema de Kafka se compromete a su clúster de Kafka.

Si especifica la StartingPosition como LATEST, Lambda comienza a leer el último mensaje de cada partición que pertenece al tema. Debido a que puede haber algún retraso después de la configuración del desencadenador antes de que Lambda comience a leer los mensajes, Lambda no lee ningún mensaje producido durante este plazo.

Lambda procesa registros de una o más particiones de temas de Kafka que especifique y envía una carga JSON a su función. Cuando hay más registros disponibles, Lambda continúa procesando registros en lotes, en función del valor BatchSize que especifique en una solicitud de CreateEventSourceMapping, hasta que la función se ponga al día con el tema.

Si su función devuelve un error para cualquiera de los mensajes de un lote, Lambda reintenta todo el lote de mensajes hasta que el procesamiento sea correcto o los mensajes caduquen. Puede enviar los registros con error en todos los reintentos a un destino en caso de error para su posterior procesamiento.

nota

Si bien las funciones de Lambda suelen tener un límite de tiempo de espera máximo de 15 minutos, las asignaciones de orígenes de eventos para Amazon MSK, Apache Kafka autoadministrado, Amazon DocumentDB y Amazon MQ para ActiveMQ y RabbitMQ solo admiten funciones con límites de tiempo de espera máximos de 14 minutos. Esta restricción garantiza que la asignación de orígenes de eventos pueda gestionar correctamente los errores y reintentos de las funciones.

Posiciones iniciales de flujos y sondeo

Tenga en cuenta que el sondeo de flujos durante la creación y las actualizaciones de la asignación de orígenes de eventos es, en última instancia, coherente.

  • Durante la creación de la asignación de orígenes de eventos, es posible que se demore varios minutos en iniciar el sondeo de los eventos del flujo.

  • Durante las actualizaciones de la asignación de orígenes de eventos, es posible que se demore varios minutos en detener y reiniciar el sondeo de los eventos del flujo.

Este comportamiento significa que, si especifica LATEST como posición inicial del flujo, la asignación de orígenes de eventos podría omitir eventos durante la creación o las actualizaciones. Para garantizar que no se pierda ningún evento, especifique la posición inicial del flujo como TRIM_HORIZON o AT_TIMESTAMP.

Escalado automático del origen de eventos Kafka

Al crear inicialmente un origen de eventos de Apache Kafka, Lambda asigna un consumidor para procesar todas las particiones en el tema de Kafka. Cada consumidor tiene varios procesadores que se ejecutan en paralelo para gestionar el aumento de las cargas de trabajo. Además, Lambda escala o reduce verticalmente de manera automática el número de consumidores, en función de la carga de trabajo. Para conservar el orden de mensajes en cada partición, el número máximo de consumidores es un consumidor por partición en el tema.

En intervalos de un minuto, Lambda evalúa el retraso de compensación del consumidor de todas las particiones del tema. Si el retraso es demasiado alto, la partición recibe mensajes más rápido de lo que Lambda puede procesarlos. Si es necesario, Lambda agrega o elimina a los consumidores del tema. El proceso de escalado para agregar o eliminar consumidores se produce dentro de los tres minutos posteriores a la evaluación.

Si su función Lambda objetivo está sobrecargada, Lambda reduce el número de consumidores. Esta acción reduce la carga de trabajo de la función al reducir el número de mensajes que los consumidores pueden recuperar y enviar a la función.

Para monitorear el rendimiento de su tema de Kafka, puede ver las métricas de consumo de Apache Kafka, como consumer_lag y consumer_offset. Para comprobar cuántas invocaciones de función se producen en paralelo, también puede supervisar las métricas de simultaneidad para su función.

Métricas de Amazon CloudWatch

Lambda emite la métrica OffsetLag mientras la función procesa los registros. El valor de esta métrica es la diferencia de compensación entre el último registro escrito en el tema de origen de eventos de Kafka y el último registro que procesó el grupo de consumidores de su función. Puede utilizar OffsetLag para estimar la latencia entre el momento en el que se agrega un registro y el momento en el que el grupo lo procesa.

Una tendencia ascendente en OffsetLag puede indicar problemas con los sondeadores en el grupo de consumidores de su función. Para obtener más información, consulte Uso de métricas de funciones de Lambda.