Uso de Lambda con Apache Kafka autoadministrado - AWS Lambda

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Uso de Lambda con Apache Kafka autoadministrado

nota

Si desea enviar datos a un destino que no sea una función de Lambda o enriquecer los datos antes de enviarlos, consulte Amazon EventBridge Pipes (Canalizaciones de Amazon EventBridge).

Lambda admite Apache Kafka como una fuente del evento. Apache Kafka es una plataforma de secuencia de eventos de código abierto que admite cargas de trabajo como canalizaciones de datos y análisis de streaming.

Puede utilizar el servicio Kafka administrado por AWS Amazon Managed Streaming for Apache Kafka (Amazon MSK) o un clúster de Kafka autoadministrado. Para conocer detalles sobre el uso de Lambda con Amazon MSK, consulte Uso de Lambda con Amazon MSK.

En este tema se describe cómo utilizar Lambda con un clúster de Kafka autoadministrado. En la terminología de AWS, un clúster autoadministrado incluye clústeres Kafka alojados que no son de AWS. Por ejemplo, puede alojar su clúster de Kafka con un proveedor de servicios en la nube, como Confluent Cloud.

Apache Kafka como fuente de eventos funciona de manera similar a utilizar Amazon Simple Queue Service (Amazon SQS) o Amazon Kinesis. Lambda sondea internamente nuevos mensajes de la fuente del evento y luego invoca sincrónicamente la función Lambda objetivo. Lambda lee los mensajes en lotes y los proporciona a su función como carga de eventos. El tamaño máximo del lote es configurable. (El valor predeterminado es 100 mensajes).

Para los orígenes de eventos basados en Kafka, Lambda admite parámetros de control de procesamiento, como los plazos de procesamiento por lotes y el tamaño del lote. Para obtener más información, consulte Comportamiento de procesamiento por lotes.

Para ver un ejemplo de cómo utilizar Kafka autoadministrado como fuente de eventos, consulte Uso de Apache Kafka autoalojado como fuente de eventos para AWS Lambda en el blog de informática de AWS.

Evento de ejemplo

Lambda envía el lote de mensajes en el parámetro de evento cuando invoca su función de Lambda. La carga de eventos contiene una matriz de mensajes. Cada elemento de la matriz contiene detalles del tema Kafka y el identificador de partición Kafka, junto con una marca de tiempo y un mensaje codificado en base64.

{ "eventSource": "SelfManagedKafka", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Autenticación de clústeres de Kafka

Lambda admite varios métodos para autenticarse con su clúster de Apache Kafka autoadministrado. Asegúrese de configurar el clúster de Kafka para que utilice uno de estos métodos de autenticación admitidos: Para obtener más información acerca de la seguridad de Kafka, consulte la sección Security (Seguridad) de la documentación de Kafka.

Acceso mediante VPC

Si solo los usuarios de Kafka de su VPC acceden a sus agentes de Kafka, debe configurar el origen de eventos de Kafka para el acceso a Amazon Virtual Private Cloud (Amazon VPC).

Autenticación SASL/SCRAM

Lambda es compatible con la autenticación simple y la autenticación de capa de seguridad/mecanismo de autenticación de respuesta por desafío saltado (SASL/SCRAM) con cifrado de seguridad de la capa de transporte (TLS) (SASL_SSL). Lambda envía las credenciales cifradas para autenticarse con el clúster. Lambda no es compatible con SASL/SCRAM con texto simple (SASL_PLAINTEXT). Para obtener más información acerca de la autenticación SASL/SCRAM, consulte RFC 5802.

Lambda también admite la autenticación SASL/PLAIN. Dado que este mecanismo utiliza credenciales en texto claro, la conexión con el servidor debe utilizar cifrado TLS para garantizar la protección de las credenciales.

Para la autenticación SASL, almacene las credenciales de inicio de sesión como secreto en AWS Secrets Manager. Para obtener más información sobre cómo utilizar Secrets Manager, consulte Tutorial: Cree y recupere un secreto en la Guía del usuario de AWS Secrets Manager.

importante

Para utilizar Secrets Manager para la autenticación, los secretos deben almacenarse en la misma región de AWS que la función de Lambda.

Autenticación TLS mutua

TLS mutua (mTLS) proporciona autenticación bidireccional entre el cliente y el servidor. El cliente envía un certificado al servidor para que el servidor verifique el cliente, mientras que el servidor envía un certificado al cliente para que el cliente verifique el servidor.

En Apache Kafka autoadministrado, Lambda actúa como cliente. Puede configurar un certificado de cliente (como secreto en Secrets Manager) para autenticar a Lambda con los agentes de Kafka. El certificado de cliente debe estar firmado por una entidad de certificación en el almacén de confianza del servidor.

El clúster de Kafka envía un certificado de servidor a Lambda para autenticar a los agentes de Kafka con Lambda. El certificado de servidor puede ser un certificado de entidad de certificación pública o un certificado autofirmado o de entidad de certificación privada. El certificado de entidad de certificación pública debe estar firmado por una entidad de certificación que esté en el almacén de confianza de Lambda. Para un certificado autofirmado o de entidad de certificación privada, configure el certificado de entidad de certificación raíz del servidor (como secreto en Secrets Manager). Lambda utiliza el certificado raíz para verificar los agentes de Kafka.

Para obtener más información acerca de mTLS, consulte Introducing mutual TLS authentication for Amazon MSK as an event source (Presentación de la autenticación de TLS mutua para Amazon MSK como origen de eventos).

Configuración del secreto de certificado de cliente

El secreto CLIENT_CERTIFICATE_TLS_AUTH requiere un campo de certificado y un campo de clave privada. Para una clave privada cifrada, el secreto requiere una contraseña de clave privada. El certificado y la clave privada deben estar en formato PEM.

nota

Lambda admite los algoritmos de cifrado de claves privadas PBES1 (pero no PBES2).

El campo de certificado debe contener una lista de certificados y debe comenzar por el certificado de cliente, seguido de cualquier certificado intermedio, y finalizar con el certificado raíz. Cada certificado debe comenzar en una nueva línea con la siguiente estructura:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager admite secretos de hasta 65 536 bytes, que supone suficiente espacio para cadenas de certificados largas.

El formato de la clave privada debe ser PKCS #8, con la siguiente estructura:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Para una clave privada cifrada, utilice la siguiente estructura:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

El siguiente ejemplo muestra el contenido de un secreto para la autenticación de mTLS mediante una clave privada cifrada. Para una clave privada cifrada, incluya la contraseña de la clave privada en el secreto.

{"privateKeyPassword":"testpassword", "certificate":"-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey":"-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Configuración del secreto de certificado de entidad de certificación raíz del servidor

Cree este secreto si sus agentes de Kafka utilizan cifrado TLS con certificados firmados por una entidad de certificación privada. Puede utilizar el cifrado TLS para autenticación VPC, SASL/SCRAM, SASL/PLAIN o mTLS.

El secreto de certificado de entidad de certificación raíz del servidor requiere un campo que contenga el certificado de entidad de certificación raíz del agente de Kafka en formato PEM. La estructura del secreto se muestra en el ejemplo siguiente.

{"certificate":"-----BEGIN CERTIFICATE----- MIID7zCCAtegAwIBAgIBADANBgkqhkiG9w0BAQsFADCBmDELMAkGA1UEBhMCVVMx EDAOBgNVBAgTB0FyaXpvbmExEzARBgNVBAcTClNjb3R0c2RhbGUxJTAjBgNVBAoT HFN0YXJmaWVsZCBUZWNobm9sb2dpZXMsIEluYy4xOzA5BgNVBAMTMlN0YXJmaWVs ZCBTZXJ2aWNlcyBSb290IENlcnRpZmljYXRlIEF1dG... -----END CERTIFICATE-----" }

Administración del acceso de la API y los permisos

Además de acceder al clúster de Kafka autoadministrado, la función de Lambda necesita permisos para llevar a cabo varias acciones de la API. Los permisos se agregan al rol de ejecución de la función. Si los usuarios tienen que acceder a cualquier acción de la API, agregue los permisos necesarios a la política de identidad para el usuario o rol de AWS Identity and Access Management (IAM).

Permisos de función de Lambda necesarios

Para crear y almacenar registros en un grupo de registros en Registros de Amazon CloudWatch, la función de Lambda debe tener los siguientes permisos en su rol de ejecución:

Permisos de función de Lambda opcionales

Es posible que la función de Lambda también necesite permisos para:

  • Describir el secreto de Secrets Manager.

  • Acceder a su clave administrada por el cliente de AWS Key Management Service (AWS KMS).

  • Acceder a su Amazon VPC.

  • Envíe los registros de las invocaciones fallidas a un destino.

Secrets Manager y permisos de AWS KMS

En función del tipo de control de acceso que configure para los agentes de Kafka, es posible que la función de Lambda necesite permiso para acceder a su secreto de Secrets Manager o para descifrar su clave administrada por el cliente de AWS KMS. Para acceder a estos recursos, el rol de ejecución de la función debe tener los siguientes permisos:

Permisos de VPC

Si solo los usuarios de una VPC pueden acceder a su clúster de Apache Kafka autoadministrado, su función de Lambda debe tener permiso para acceder a sus recursos de Amazon VPC. Estos recursos incluyen su VPC, subredes, grupos de seguridad e interfaces de red. Para acceder a estos recursos, el rol de ejecución de la función debe tener los siguientes permisos:

Envío de registros a un destino

Si desea enviar registros de invocaciones fallidas a un destino en caso de error, la función de Lambda debe tener permiso para enviar estos registros. Para las asignaciones de orígenes de eventos de Kafka, puede elegir entre un tema de Amazon SNS, una cola de Amazon SQS o un bucket de Amazon S3 como destino. Para enviar registros a un tema de SNS, el rol de ejecución de la función debe tener el siguiente permiso:

Para enviar registros a una cola de SQS, el rol de ejecución de la función debe tener el siguiente permiso:

Para enviar registros a un bucket de S3, el rol de ejecución de la función debe tener los siguientes permisos:

Además, si configuró una clave de KMS en su destino, Lambda necesita los siguientes permisos según el tipo de destino:

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de S3, necesitará kms:GenerateDataKey. Si la clave de KMS y el destino del bucket de S3 están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:GenerateDataKey.

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de SQS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino de la cola de SQS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de SNS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino del tema de SNS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.

Adición de permisos a su rol de ejecución

Para tener acceso a otros servicios de AWS que su clúster Apache Kafka autoadministrado utiliza, Lambda utiliza las políticas de permisos que defina en el rol de ejecución de la función de Lambda.

De forma predeterminada, Lambda no está permitido realizar las acciones necesarias u opcionales para un clúster Apache Kafka autoadministrado. Debe crear y definir estas acciones en una política de confianza de IAM y, a continuación, adjuntarla a su rol de ejecución. En este ejemplo se muestra cómo puede crear una política que permita a Lambda tener acceso a los recursos de Amazon VPC.

{ "Version":"2012-10-17", "Statement":[ { "Effect":"Allow", "Action":[ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups" ], "Resource":"*" } ] }

Para obtener información sobre cómo crear un documento de política de JSON en la consola de IAM, consulte Creación de políticas en la pestaña JSON en la Guía del usuario de IAM.

Concesión de acceso a los usuarios con una política de IAM

De forma predeterminada, los usuarios y roles no tienen permiso para llevar a cabo operaciones de API de origen de eventos. Para conceder acceso a los usuarios de su organización o cuenta, cree o actualice la política basada en identidades. Para obtener más información, consulte Control del acceso a los recursos de AWS mediante políticas en la Guía del usuario de IAM.

Errores de autenticación y autorización

Si falta alguno de los permisos necesarios para consumir datos del clúster de Kafka, Lambda muestra uno de los siguientes mensajes de error en la asignación de orígenes de eventos en LastProcessingResult.

Error de autorización del clúster a Lambda

Para SASL/SCRAM o mTLS, este error indica que el usuario proporcionado no tiene todos los permisos de lista de control de acceso (ACL) de Kafka necesarios que se indican a continuación:

  • Clúster DescribeConfigs

  • Descripción del grupo

  • Grupo de lectura

  • Descripción del tema

  • Tema de lectura

Cuando crea las ACL de Kafka con los permisos de kafka-cluster necesarios, debe especificar el tema y el grupo como recursos. El nombre del tema debe coincidir con el tema de la asignación de origen de eventos. El nombre del grupo debe coincidir con el UUID de la asignación de origen de eventos.

Después de agregar los permisos necesarios al rol de ejecución, pueden pasar varios minutos hasta que los cambios surtan efecto.

Error de autenticación de SASL

Para SASL/SCRAM o SASL/PLAIN, este error indica que las credenciales de inicio de sesión proporcionadas no son válidas.

El servidor no pudo autenticar Lambda

Este error indica que el agente de Kafka no ha podido autenticar Lambda. Este error puede producirse por cualquiera de las razones siguientes:

  • No proporcionó ningún certificado de cliente para la autenticación de mTLS.

  • Proporcionó un certificado de cliente, pero los agentes de Kafka no están configurados para utilizar la autenticación de mTLS.

  • Los agentes de Kafka no confían en el certificado de cliente.

Lambda no ha podido autenticar el servidor

Este error indica que Lambda no ha podido autenticar el agente de Kafka. Este error puede producirse por cualquiera de las razones siguientes:

  • Los agentes de Kafka utilizan certificados autofirmados o una entidad de certificación privada, pero no proporcionaron el certificado de entidad de certificación raíz del servidor.

  • El certificado de entidad de certificación raíz del servidor no coincide con la entidad de certificación raíz que firmó el certificado del agente.

  • No se pudo validar el nombre de host porque el certificado del agente no contiene el nombre DNS ni la dirección IP del agente como nombre alternativo de sujeto.

El certificado o la clave privada proporcionados no son válidos

Este error indica que el consumidor de Kafka no ha podido utilizar la clave privada ni el certificado proporcionados. Asegúrese de que el formato del certificado y la clave sea PEM, y de que el cifrado de clave privada utilice un algoritmo PBES1.

Configuración de red

Si configura el acceso de Amazon VPC a sus agentes de Kafka, Lambda debe tener acceso a los recursos de Amazon VPC en los que reside el clúster de Kafka. Le recomendamos que implemente puntos de conexión de VPC AWS PrivateLink para Lambda y AWS Security Token Service (AWS STS). Si el agente utiliza autenticación, implemente también un punto de conexión de VPC para Secrets Manager. Si configuró un destino en caso de error, implemente también un punto de conexión de VPC para el servicio de destino.

De manera alternativa, asegúrese de que la VPC asociada a su clúster de Kafka incluya una gateway NAT por subred pública. Para obtener más información, consulte Acceso a Internet y a los servicios para funciones conectadas a la VPC.

Si usa puntos de enlace de VPC, también debe configurarlos para habilitar los nombres DNS privados.

Configure sus grupos de seguridad de Amazon VPC con las siguientes reglas (como mínimo):

  • Reglas de entrada: permiten todo el tráfico en el puerto del agente de Kafka para los grupos de seguridad especificados para el origen de eventos. De forma predeterminada, Kafka utiliza el puerto 9092.

  • Reglas de salida: permiten todo el tráfico en el puerto 443 para todos los destinos. Permiten todo el tráfico en el puerto del agente de Kafka para los grupos de seguridad especificados para el origen de eventos. De forma predeterminada, Kafka utiliza el puerto 9092.

  • Si utiliza puntos de conexión de VPC en lugar de una puerta de enlace NAT, los grupos de seguridad asociados a los puntos de conexión de VPC deben permitir todo el tráfico entrante en el puerto 443 desde los grupos de seguridad del origen de eventos.

Para obtener más información sobre la configuración de la red, consulte Configuración de AWS Lambda con un clúster Apache Kafka dentro de una VPC en el Blog de informática de AWS.

Agregar un clúster Kafka como fuente de eventos

Para crear un mapeo de fuente de eventos, agregue su clúster Kafka como un desencadenador de una función de Lambda a través de la consola de Lambda, una AWS SDK, o elAWS Command Line Interface (AWS CLI).

En esta sección se describe cómo crear una asignación de orígenes de eventos mediante la consola de Lambda y la AWS CLI.

nota

Cuando actualice, deshabilite o elimine una asignación de orígenes de eventos para Apache Kafka autoadministrado, los cambios pueden tardar hasta 15 minutos en surtir efecto. Antes de que haya transcurrido este periodo, la asignación de orígenes de eventos puede continuar con el procesamiento de los eventos y la invocación de la función con la configuración anterior. Esto es cierto incluso cuando el estado de la asignación de orígenes de eventos que se muestra en la consola indica que se aplicaron los cambios.

Requisitos previos

  • Un clúster de Apache Kafka autoadministrado. Lambda es compatible con Apache Kafka 0.10.1.0 y versiones posteriores.

  • Un rol de ejecución con permiso para acceder a los recursos de AWS que utiliza el clúster de Kafka autoadministrado.

ID del grupo de consumidores personalizable

Al configurar Kafka como origen de eventos, puede especificar un ID de grupo de consumidores. Este ID de grupo de consumidores es un identificador existente para el grupo de consumidores de Kafka al que desea que se una la función de Lambda. Puede utilizar esta característica para migrar sin problemas cualquier configuración de procesamiento de registro de Kafka en curso de otros consumidores a Lambda.

Si especifica un ID de grupo de consumidores y hay otros sondeadores activos dentro de ese grupo de consumidores, Kafka distribuirá los mensajes entre todos los consumidores. En otras palabras, Lambda no recibe todos los mensajes del tema Kafka. Si desea que Lambda gestione todos los mensajes del tema, desactive los demás sondeadores de ese grupo de consumidores.

Además, si especifica un ID de grupo de consumidores y Kafka encuentra un grupo de consumidores existente válido con el mismo ID, Lambda ignora el parámetro StartingPosition para la asignación de orígenes de eventos. En cambio, Lambda comienza a procesar los registros de acuerdo con la compensación comprometida del grupo de consumidores. Si especifica un ID de grupo de consumidores y Kafka no puede encontrar un grupo de consumidores existente, Lambda configura el origen de eventos con el StartingPosition especificado.

El ID del grupo de consumidores que especifique debe ser único entre todos los orígenes de eventos de Kafka. Tras crear una asignación de orígenes de eventos de Kafka con el ID de grupo de consumidores especificado, no puede actualizar este valor.

Destinos en caso de error

Para retener los registros de las invocaciones fallidas o de las cargas útiles sobredimensionadas de su origen de eventos de Kafka, configure un destino en caso de error para su función. Cuando se produce un error en una invocación, Lambda envía un registro JSON que contiene detalles de la invocación a su destino.

Puede elegir entre un tema de Amazon SNS, una cola de Amazon SQS o un bucket de Amazon S3 como su destino. Para los destinos de temas de SNS o colas de SQS, Lambda envía los metadatos del registro al destino. Para buckets de S3 de destino, Lambda envía todo el registro de invocación junto con los metadatos al destino.

Para que Lambda envíe correctamente los registros al destino elegido, asegúrese de que el rol de ejecución de la función contenga los permisos pertinentes. En la tabla también se describe cómo cada tipo de destino recibe el registro de invocación de JSON.

Tipo de destino Compatible con los siguientes orígenes de eventos Permisos necesarios Formato JSON específico del destino

Cola de Amazon SQS

  • Kinesis

  • DynamoDB

  • Apache Kafka autoadministrado y Apache Kafka administrado

Lambda pasa los metadatos del registro de invocación como Message al destino.

Tema de Amazon SNS

  • Kinesis

  • DynamoDB

  • Apache Kafka autoadministrado y Apache Kafka administrado

Lambda pasa los metadatos del registro de invocación como Message al destino.

Bucket de Amazon S3

  • Apache Kafka autoadministrado y Apache Kafka administrado

Lambda almacena el registro de invocación junto con sus metadatos en el destino.

sugerencia

Como práctica recomendada, incluya los permisos mínimos requeridos solo en su rol de ejecución.

Destinos SNS y SQS

El siguiente ejemplo muestra lo que Lambda envía a un destino de tema de SNS o cola de SQS cuando se produce un error en la invocación de un origen de eventos de Kafka. Cada una de las claves en recordsInfo contiene el tema y la partición de Kafka, separados por un guion. Por ejemplo, para la clave "Topic-0", Topic es el tema de Kafka, y 0 es la partición. Para cada tema y partición, puede usar los desplazamientos y los datos de las marcas de tiempo para buscar los registros de invocación originales.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Destinos de S3

Para los destinos de S3, Lambda envía todo el registro de invocación junto con los metadatos al destino. El siguiente ejemplo muestra lo que Lambda envía a un bucket de S3 de destino cuando se produce un error en la invocación de un origen de evento de Kafka. Además de todos los campos del ejemplo anterior para los destinos de SQS y SNS, el campo payload contiene el registro de invocación original en forma de cadena JSON de escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
sugerencia

También se recomienda habilitar el control de versiones de S3 en el bucket de destino.

Configuración de destinos en caso de error

Para configurar un destino en caso de error mediante la consola, siga estos pasos:

  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija una función.

  3. En Descripción general de la función, elija Agregar destino.

  4. En Origen, elija Invocación de asignación de orígenes de eventos.

  5. Para la Asignación de orígenes de eventos, elija un origen de eventos que esté configurado para esta función.

  6. En Condición, seleccione En caso de error. Para las invocaciones de asignación de orígenes de eventos, esta es la única condición aceptada.

  7. En Tipo de destino, elija el tipo de destino al que Lambda envía los registros de invocación.

  8. En Destino, elija un recurso.

  9. Elija Guardar.

También puede configurar un destino en caso de error mediante la API de Lambda. Por ejemplo, el siguiente comando de la CLI CreateEventSourceMapping agrega un destino de SQS en caso de error a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

El siguiente comando de la CLI UpdateEventSourceMapping agrega un destino S3 en caso de error al origen de eventos de Kafka asociado a la entrada uuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Para eliminar un destino, introduzca una cadena vacía como argumento del parámetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Agregar un clúster de Kafka autoadministrado (consola)

Siga estos pasos para agregar su clúster Apache Kafka autoadministrado y un tema Kafka como desencadenador de su función de Lambda.

Para agregar un desencadenador de Apache Kafka a su función de Lambda (consola)
  1. .Abra la página de Functions (Funciones) en la consola de Lambda.

  2. Elija el nombre de su función de Lambda.

  3. En Descripción general de la función, elija Agregar desencadenador.

  4. En Configuración del desencadenador, haga lo siguiente:

    1. Elija el tipo de desencadenador Apache Kafka.

    2. Para los servidores de Bootstrap, ingrese la dirección de host y par de puertos de un broker de Kafka en su clúster y, a continuación, elija Add (Agregar). Repita para cada broker de Kafka en el clúster.

    3. Para el nombre del tema, escriba el nombre del tema Kafka utilizado para almacenar registros en el clúster.

    4. (Opcional) Para Tamaño del lote, introduzca el número máximo de registros que se recibirán en un solo lote.

    5. Para el periodo de lotes, ingrese la cantidad máxima de segundos que Lambda emplea a fin de recopilar registros antes de invocar la función.

    6. (Opcional) Para el ID del grupo de consumidores, ingrese el ID de un grupo de consumidores de Kafka al que unirse.

    7. (Opcional) En Posición inicial, elija Última para empezar a leer el flujo desde el registro más reciente, Horizonte de supresión para comenzar por el registro más antiguo disponible o En la marca de tiempo para especificar una marca de tiempo desde la cual comenzar a leer.

    8. (Opcional) Para VPC, elija Amazon VPC para su clúster de Kafka. A continuación, elija VPC subnets (Subredes de VPC) y VPC security groups (Grupos de seguridad de VPC).

      Esta configuración es obligatoria si solo los usuarios de la VPC acceden a los agentes.

    9. (Opcional) Para Authentication (Autenticación), elija Add (Agregar) y, a continuación, haga lo siguiente:

      1. Elija el protocolo de acceso o autenticación de los agentes de Kafka en su clúster.

        • Si su agente de Kafka utiliza autenticación SASL/PLAIN, elija BASIC_AUTH.

        • Si su agente utiliza autenticación de SASL/SCRAM, elija uno de los protocolos de SASL_SCRAM.

        • Si configura la autenticación de mTLS, elija el protocolo CLIENT_CERTIFICATE_TLS_AUTH.

      2. Para la autenticación de SASL/SCRAM o mTLS, elija la clave secreta de Secrets Manager que contiene las credenciales del clúster de Kafka.

    10. (Opcional) Para Encryption (Cifrado), elija el secreto de Secrets Manager que contiene el certificado de entidad de certificación raíz que los agentes de Kafka utilizan para el cifrado con TLS, si los agentes de Kafka utilizan certificados firmados por una entidad de certificación privada.

      Esta configuración se aplica al cifrado con TLS para SASL/SCRAM o SASL/PLAIN y a la autenticación con mTLS.

    11. Para crear el desencadenador en un estado deshabilitado para la prueba (recomendado), desactive Activar desencadenador. O bien, para habilitar el desencadenador de inmediato, seleccioneActivar desencadenador.

  5. Para crear el desencadenador, elija Add (Añadir).

Agregar un clúster Kafka autoadministrado (AWS CLI)

Utilice los siguientes AWS CLI comandos de ejemplo para crear y ver un desencadenador Apache Kafka autoadminstrado para su función de Lambda.

Uso de SASL/SCRAM

Si los usuarios de Kafka acceden a los agentes de Kafka a través de Internet, especifique el secreto de Secrets Manager que creó para la autenticación con SASL/SCRAM. El siguiente ejemplo utiliza el comando de AWS CLI create-event-source-mapping para asignar una función de Lambda llamada my-kafka-function a un tema de Kafka llamado AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration Type=SASL_SCRAM_512_AUTH,URI=arn:aws:secretsmanager:us-east-1:111122223333:secret:MyBrokerSecretName \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Uso de una VPC

Si solo los usuarios de Kafka de su VPC acceden a sus agentes de Kafka, debe especificar su VPC, subredes y grupo de seguridad de VPC. El siguiente ejemplo utiliza el comando create-event-source-mapping AWS CLI para mapear una función de Lambda llamada my-kafka-function a un tema de Kafka llamado AWSKafkaTopic.

aws lambda create-event-source-mapping \ --topics AWSKafkaTopic \ --source-access-configuration '[{"Type": "VPC_SUBNET", "URI": "subnet:subnet-0011001100"}, {"Type": "VPC_SUBNET", "URI": "subnet:subnet-0022002200"}, {"Type": "VPC_SECURITY_GROUP", "URI": "security_group:sg-0123456789"}]' \ --function-name arn:aws:lambda:us-east-1:111122223333:function:my-kafka-function \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc3.xyz.com:9092", "abc2.xyz.com:9092"]}}'

Visualización del estado mediante la AWS CLI

El siguiente ejemplo utiliza el comando get-event-source-mapping AWS CLI para describir el estado de la asignación de orígenes de eventos que ha creado.

aws lambda get-event-source-mapping --uuid dh38738e-992b-343a-1077-3478934hjkfd7

Utilizar un clúster Kafka como fuente de eventos

Cuando agrega su clúster Apache Kafka como desencadenador para su función de Lambda, el clúster se utiliza como fuente del evento.

Lambda lee los datos de eventos de los temas de Kafka que especifique como Topics en una solicitud de CreateEventSourceMapping, en función de la StartingPosition que especifique. Después de un procesamiento exitoso, su tema de Kafka se compromete a su clúster de Kafka.

Si especifica la StartingPosition como LATEST, Lambda comienza a leer el último mensaje de cada partición que pertenece al tema. Debido a que puede haber algún retraso después de la configuración del desencadenador antes de que Lambda comience a leer los mensajes, Lambda no lee ningún mensaje producido durante este plazo.

Lambda procesa registros de una o más particiones de temas de Kafka que especifique y envía una carga JSON a su función. Cuando hay más registros disponibles, Lambda continúa procesando registros en lotes, en función del valor BatchSize que especifique en una solicitud de CreateEventSourceMapping, hasta que la función se ponga al día con el tema.

Si su función devuelve un error para cualquiera de los mensajes de un lote, Lambda reintenta todo el lote de mensajes hasta que el procesamiento sea correcto o los mensajes caduquen. Puede enviar los registros con error en todos los reintentos a un destino en caso de error para su posterior procesamiento.

nota

Si bien las funciones de Lambda suelen tener un límite de tiempo de espera máximo de 15 minutos, las asignaciones de orígenes de eventos para Amazon MSK, Apache Kafka autoadministrado, Amazon DocumentDB y Amazon MQ para ActiveMQ y RabbitMQ solo admiten funciones con límites de tiempo de espera máximos de 14 minutos. Esta restricción garantiza que la asignación de orígenes de eventos pueda gestionar correctamente los errores y reintentos de las funciones.

Posiciones iniciales de flujos y sondeo

Tenga en cuenta que el sondeo de flujos durante la creación y las actualizaciones de la asignación de orígenes de eventos es, en última instancia, coherente.

  • Durante la creación de la asignación de orígenes de eventos, es posible que se demore varios minutos en iniciar el sondeo de los eventos del flujo.

  • Durante las actualizaciones de la asignación de orígenes de eventos, es posible que se demore varios minutos en detener y reiniciar el sondeo de los eventos del flujo.

Este comportamiento significa que, si especifica LATEST como posición inicial del flujo, la asignación de orígenes de eventos podría omitir eventos durante la creación o las actualizaciones. Para garantizar que no se pierda ningún evento, especifique la posición inicial del flujo como TRIM_HORIZON o AT_TIMESTAMP.

Escalado automático de la fuente de eventos Kafka

Al crear inicialmente un origen de eventos de Apache Kafka, Lambda asigna un consumidor para procesar todas las particiones en el tema de Kafka. Cada consumidor tiene varios procesadores que se ejecutan en paralelo para gestionar el aumento de las cargas de trabajo. Además, Lambda escala o reduce verticalmente de manera automática el número de consumidores, en función de la carga de trabajo. Para conservar el orden de mensajes en cada partición, el número máximo de consumidores es un consumidor por partición en el tema.

En intervalos de un minuto, Lambda evalúa el retraso de compensación del consumidor de todas las particiones del tema. Si el retraso es demasiado alto, la partición recibe mensajes más rápido de lo que Lambda puede procesarlos. Si es necesario, Lambda agrega o elimina a los consumidores del tema. El proceso de escalado para agregar o eliminar consumidores se produce dentro de los tres minutos posteriores a la evaluación.

Si su función Lambda objetivo está sobrecargada, Lambda reduce el número de consumidores. Esta acción reduce la carga de trabajo de la función al reducir el número de mensajes que los consumidores pueden recuperar y enviar a la función.

Para monitorear el rendimiento de su tema de Kafka, puede ver las métricas de consumo de Apache Kafka, como consumer_lag y consumer_offset. Para comprobar cuántas invocaciones de función se producen en paralelo, también puede supervisar las métricas de simultaneidad para su función.

Operaciones de API de origen de eventos

Cuando agrega su clúster de Kafka como una fuente de eventos para su función de Lambda mediante la consola de Lambda, un AWS SDK o la AWS CLI, Lambda utiliza API para procesar su solicitud.

Para administrar un origen de eventos con la AWS Command Line Interface (AWS CLI) o AWS SDK, puede utilizar las siguientes operaciones de la API:

Errores de asignación de orígenes de eventos

Cuando agrega el clúster de Apache Kafka como una fuente de eventos para su función de Lambda, si su función encuentra un error, su consumidor de Kafka deja de procesar registros. Los consumidores de una partición de tema son aquellos que se suscriben, leen y procesan sus registros. Sus otros consumidores de Kafka pueden continuar procesando registros, siempre que no encuentren el mismo error.

Para determinar la causa de un consumidor detenido, compruebe el StateTransitionReason campo en la respuesta de EventSourceMapping. En la siguiente lista se describen los errores de origen de eventos que puede recibir:

ESM_CONFIG_NOT_VALID

La configuración de asignación de orígenes de eventos no es válida.

EVENT_SOURCE_AUTHN_ERROR

Lambda no pudo autenticar el origen de eventos.

EVENT_SOURCE_AUTHZ_ERROR

Lambda no tiene los permisos necesarios para acceder al origen de eventos.

FUNCTION_CONFIG_NOT_VALID

La configuración de la función no es válida.

nota

Si los registros de eventos de Lambda superan el límite de tamaño permitido de 6 MB, pueden no procesarse.

Métricas de Amazon CloudWatch

Lambda emite la métrica OffsetLag mientras la función procesa los registros. El valor de esta métrica es la diferencia de compensación entre el último registro escrito en el tema de origen de eventos de Kafka y el último registro que procesó el grupo de consumidores de su función. Puede utilizar OffsetLag para estimar la latencia entre el momento en el que se agrega un registro y el momento en el que el grupo lo procesa.

Una tendencia ascendente en OffsetLag puede indicar problemas con los sondeadores en el grupo de consumidores de su función. Para obtener más información, consulte Uso de métricas de funciones de Lambda.

Parámetros de configuración de Apache Kafka autoadministrado

Todos los tipos de fuente de eventos Lambda comparten las mismas operaciones CreateEventSourceMapping y UpdateEventSourceMapping de la API. Sin embargo, solo algunos de los parámetros se aplican a Apache Kafka.

Parámetros de fuentes de eventos que se aplican a Apache Kafka autoadministrado
Parámetro Obligatoria Predeterminado Notas

BatchSize

N

100

Máximo: 10 000

Habilitado

N

Habilitado

FunctionName

Y

FilterCriteria

N

Filtrado de eventos de Lambda

MaximumBatchingWindowInSeconds

N

500 ms

Comportamiento de procesamiento por lotes

SelfManagedEventSource

Y

Lista de agentes de Kafka. Solo se puede establecer en Crear

SelfManagedKafkaEventSourceConfig

N

Contiene el campo ConsumerGroupId, que se establece de forma predeterminada en un valor único.

Solo se puede establecer en Crear

SourceAccessConfigurations

N

Sin credenciales

Información de VPC o credenciales de autenticación para el clúster

Para SASL_PLAIN, establezca en BASIC_AUTH

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON o LATEST

Solo se puede establecer en Crear

StartingPositionTimestamp

N

Obligatorio si StartingPosition se establece en AT_TIMESTAMP

Temas

Y

Nombre del tema

Solo se puede establecer en Crear