Uso de Lambda con Amazon MSK - AWS Lambda

Uso de Lambda con Amazon MSK

nota

Si desea enviar datos a un destino que no sea una función de Lambda o enriquecer los datos antes de enviarlos, consulte Amazon EventBridge Pipes (Canalizaciones de Amazon EventBridge).

Amazon Managed Streaming para Apache Kafka (Amazon MSK) es un servicio completamente administrado que le permite crear y ejecutar aplicaciones que utilizan Apache Kafka para procesar datos de transmisión. Amazon MSK simplifica la configuración, el escalado y la administración de clústeres que ejecutan Kafka. Amazon MSK también facilita la configuración de la aplicación para varias zonas de disponibilidad y para la seguridad con AWS Identity and Access Management (IAM). Amazon MSK es compatible con múltiples versiones de código abierto de Kafka.

Amazon MSK como fuente de eventos funciona de manera similar al uso de Amazon Simple Queue Service (Amazon SQS) o Amazon Kinesis. Lambda sondea internamente nuevos mensajes de la fuente del evento y luego invoca sincrónicamente la función de Lambda objetivo. Lambda lee los mensajes en lotes y los proporciona a su función como carga de eventos. El tamaño máximo del lote se puede configurar (el valor predeterminado son 100 mensajes). Para obtener más información, consulte Comportamiento de procesamiento por lotes.

nota

Si bien las funciones de Lambda suelen tener un límite de tiempo de espera máximo de 15 minutos, las asignaciones de orígenes de eventos para Amazon MSK, Apache Kafka autoadministrado, Amazon DocumentDB y Amazon MQ para ActiveMQ y RabbitMQ solo admiten funciones con límites de tiempo de espera máximos de 14 minutos. Esta restricción garantiza que la asignación de orígenes de eventos pueda gestionar correctamente los errores y reintentos de las funciones.

Lambda lee los mensajes secuencialmente para cada partición. Una sola carga de Lambda puede contener mensajes de varias particiones. Después de que Lambda procese cada lote, confirma los desplazamientos de los mensajes en ese lote. Si su función devuelve un error para cualquiera de los mensajes de un lote, Lambda reintenta todo el lote de mensajes hasta que el procesamiento sea correcto o los mensajes caduquen.

aviso

Las asignaciones de orígenes de eventos de Lambda procesan cada evento al menos una vez y puede producirse un procesamiento duplicado de registros. Para evitar posibles problemas relacionados con la duplicación de eventos, le recomendamos encarecidamente que haga que el código de la función sea idempotente. Para obtener más información, consulte ¿Cómo puedo hacer que mi función de Lambda sea idempotente? en el Centro de conocimientos de AWS.

Para ver un ejemplo de cómo configurar Amazon MSK como fuente de eventos, consulte Uso de Amazon MSK como fuente de eventos AWS Lambda en el Blog de informática de AWS. Para obtener un tutorial completo, consulte Amazon MSK Lambda Integration (Integración de Amazon MSK y Lambda) en Amazon MSK Labs.

Evento de ejemplo

Lambda envía el lote de mensajes en el parámetro de evento cuando invoca su función. La carga de eventos contiene una matriz de mensajes. Cada elemento de matriz contiene detalles del tema y el identificador de partición de Amazon MSK, junto con una marca de hora y un mensaje codificado en base64.

{ "eventSource":"aws:kafka", "eventSourceArn":"arn:aws:kafka:sa-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers":"b-2.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092,b-1.demo-cluster-1.a1bcde.c1.kafka.us-east-1.amazonaws.com:9092", "records":{ "mytopic-0":[ { "topic":"mytopic", "partition":0, "offset":15, "timestamp":1545084650987, "timestampType":"CREATE_TIME", "key":"abcDEFghiJKLmnoPQRstuVWXyz1234==", "value":"SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "headers":[ { "headerKey":[ 104, 101, 97, 100, 101, 114, 86, 97, 108, 117, 101 ] } ] } ] } }

Autenticación de clústeres de MSK

Lambda necesita permiso para acceder al clúster de Amazon MSK, recuperar registros y llevar a cabo otras tareas. Amazon MSK admite varias opciones para controlar el acceso de los clientes al clúster de MSK.

Acceso sin autenticar

Si ningún cliente accede al clúster a través de Internet, puede utilizar el acceso no autenticado.

Autenticación SASL/SCRAM

Amazon MSK admite autenticación simple y autenticación de capa de seguridad/mecanismo de autenticación de respuesta por desafío saltado (SASL/SCRAM) con cifrado de seguridad de la capa de transporte (TLS). Para que Lambda se conecte al clúster, las credenciales de autenticación (nombre de usuario y contraseña) se almacenan en un secreto de AWS Secrets Manager.

Para obtener más información sobre Secrets Manager, consulte Autenticación de usuario y contraseña con AWS Secrets Manager (en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Amazon MSK no admite la autenticación SASL/PLAIN.

Autenticación basada en roles de IAM

Puede utilizar IAM para autenticar la identidad de los clientes que se conectan al clúster de MSK. Si la autenticación de IAM está activa en el clúster de MSK y no proporciona un secreto para la autenticación, Lambda utilizará de forma automática la autenticación de IAM. Para crear e implementar políticas basadas en roles o usuarios, utilice la API o la consola de IAM. Para obtener más información, consulte IAM access control (Control de acceso de IAM) en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Para permitir que Lambda se conecte al clúster de MSK, lea registros y lleve a cabo otras acciones necesarias, agregue los siguientes permisos al rol de ejecución de la función.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/cluster-name/cluster-uuid", "arn:aws:kafka:region:account-id:topic/cluster-name/cluster-uuid/topic-name", "arn:aws:kafka:region:account-id:group/cluster-name/cluster-uuid/consumer-group-id" ] } ] }

Puede asignar estos permisos a un clúster, un tema y un grupo específicos. Para obtener más información, consulte Acciones de Amazon MSK para Kafka en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Autenticación TLS mutua

TLS mutua (mTLS) proporciona autenticación bidireccional entre el cliente y el servidor. El cliente envía un certificado al servidor para que el servidor verifique el cliente, mientras que el servidor envía un certificado al cliente para que el cliente verifique el servidor.

En el caso de Amazon MSK, Lambda actúa como cliente. Puede configurar un certificado de cliente (como secreto en Secrets Manager) para autenticar a Lambda con los agentes del clúster de MSK. El certificado de cliente debe estar firmado por una entidad de certificación en el almacén de confianza del servidor. El clúster de MSK envía un certificado de servidor a Lambda para autenticar a los agentes con Lambda. El certificado de servidor debe estar firmado por una entidad de certificación que esté en el almacén de confianza de AWS.

Para obtener instrucciones sobre cómo generar un certificado de cliente, consulte Introducing mutual TLS authentication for Amazon MSK as an event source (Presentación de la autenticación con TLS mutua para Amazon MSK como origen de eventos).

Amazon MSK no admite certificados de servidor autofirmados porque todos los agentes de Amazon MSK utilizan certificados públicos firmados por entidades de certificación de Amazon Trust Services, en los que Lambda confía de forma predeterminada.

Para obtener más información sobre mTLS para Amazon MSK, consulte Mutual TLS Authentication (Autenticación con TLS mutua) en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Configuración del secreto de mTLS

El secreto CLIENT_CERTIFICATE_TLS_AUTH requiere un campo de certificado y un campo de clave privada. Para una clave privada cifrada, el secreto requiere una contraseña de clave privada. El certificado y la clave privada deben estar en formato PEM.

nota

Lambda admite los algoritmos de cifrado de claves privadas PBES1 (pero no PBES2).

El campo de certificado debe contener una lista de certificados y debe comenzar por el certificado de cliente, seguido de cualquier certificado intermedio, y finalizar con el certificado raíz. Cada certificado debe comenzar en una nueva línea con la siguiente estructura:

-----BEGIN CERTIFICATE----- <certificate contents> -----END CERTIFICATE-----

Secrets Manager admite secretos de hasta 65 536 bytes, que supone suficiente espacio para cadenas de certificados largas.

El formato de la clave privada debe ser PKCS #8, con la siguiente estructura:

-----BEGIN PRIVATE KEY----- <private key contents> -----END PRIVATE KEY-----

Para una clave privada cifrada, utilice la siguiente estructura:

-----BEGIN ENCRYPTED PRIVATE KEY----- <private key contents> -----END ENCRYPTED PRIVATE KEY-----

El siguiente ejemplo muestra el contenido de un secreto para la autenticación de mTLS mediante una clave privada cifrada. Para una clave privada cifrada, incluya la contraseña de la clave privada en el secreto.

{ "privateKeyPassword": "testpassword", "certificate": "-----BEGIN CERTIFICATE----- MIIE5DCCAsygAwIBAgIRAPJdwaFaNRrytHBto0j5BA0wDQYJKoZIhvcNAQELBQAw ... j0Lh4/+1HfgyE2KlmII36dg4IMzNjAFEBZiCRoPimO40s1cRqtFHXoal0QQbIlxk cmUuiAii9R0= -----END CERTIFICATE----- -----BEGIN CERTIFICATE----- MIIFgjCCA2qgAwIBAgIQdjNZd6uFf9hbNC5RdfmHrzANBgkqhkiG9w0BAQsFADBb ... rQoiowbbk5wXCheYSANQIfTZ6weQTgiCHCCbuuMKNVS95FkXm0vqVD/YpXKwA/no c8PH3PSoAaRwMMgOSA2ALJvbRz8mpg== -----END CERTIFICATE-----", "privateKey": "-----BEGIN ENCRYPTED PRIVATE KEY----- MIIFKzBVBgkqhkiG9w0BBQ0wSDAnBgkqhkiG9w0BBQwwGgQUiAFcK5hT/X7Kjmgp ... QrSekqF+kWzmB6nAfSzgO9IaoAaytLvNgGTckWeUkWn/V0Ck+LdGUXzAC4RxZnoQ zp2mwJn2NYB7AZ7+imp0azDZb+8YG2aUCiyqb6PnnA== -----END ENCRYPTED PRIVATE KEY-----" }

Cómo Lambda elige un agente de arranque

Lambda elige un agente de arranque en función de los métodos de autenticación disponibles en el clúster y de si proporciona un secreto para la autenticación. Si proporciona un secreto para mTLS o SASL/SCRAM, Lambda elige de forma automática ese método de autenticación. Si no proporciona ningún secreto, Lambda selecciona el método de autenticación más seguro que esté activo en el clúster. El siguiente es el orden de prioridad en el que Lambda selecciona un agente, de la autenticación más segura a la menos segura:

  • mTLS (secreto proporcionado para mTLS)

  • SASL/SCRAM (secreto proporcionado para SASL/SCRAM)

  • SASL IAM (no se proporciona secreto y la autenticación de IAM está activa)

  • TLS no autenticada (no se proporciona secreto y la autenticación de IAM no está activa)

  • Texto sin formato (no se proporciona secreto y tanto la autenticación de IAM como la TLS no autenticada no están activas)

nota

Si Lambda no puede conectarse al tipo de agente más seguro, no intentará conectarse a un tipo de agente diferente (menos seguro). Si quiere que Lambda elija un tipo de agente más débil, desactive todos los métodos de autenticación más seguros del clúster.

Administración del acceso de la API y los permisos

Además de acceder al clúster de Amazon MSK, la función necesita permisos para llevar a cabo varias acciones de la API de Amazon MSK. Los permisos se agregan al rol de ejecución de la función. Si los usuarios tienen que acceder a cualquier acción de la API de Amazon MSK, agregue los permisos necesarios a la política de identidad para el usuario o rol.

Puede agregar cada uno de los siguientes permisos a su rol de ejecución de forma manual. Como alternativa, puede adjuntar la política administrada por AWS AWSLambdaMSKExecutionRole a su rol de ejecución. La política AWSLambdaMSKExecutionRole contiene todas las acciones de API y los permisos de VPC necesarios que se enumeran a continuación.

Permisos de rol de ejecución de la función de Lambda necesarios

Para crear y almacenar registros en un grupo de registros en Registros de Amazon CloudWatch, la función de Lambda debe tener los siguientes permisos en su rol de ejecución:

Para que Lambda acceda al clúster de Amazon MSK en su nombre, la función de Lambda debe tener los siguientes permisos en su rol de ejecución:

Solo tiene que agregar el permiso kafka:DescribeCluster o el kafka:DescribeClusterV2. En el caso de los clústeres de MSK aprovisionados, cualquiera de los dos permisos funciona. Para los clústeres de MSK sin servidor, debe utilizar kafka:DescribeClusterV2.

nota

Lambda planea eliminar en su momento el permiso kafka:DescribeCluster de la política administrada asociada AWSLambdaMSKExecutionRole. Si utiliza esta política, debería migrar cualquier aplicación con kafka:DescribeCluster para utilizar kafka:DescribeClusterV2 en su lugar.

Permisos de VPC

Si solo los usuarios dentro de una VPC pueden acceder a su clúster de Amazon MSK, su función de Lambda debe tener permiso para acceder a sus recursos de Amazon VPC. Estos recursos incluyen su VPC, subredes, grupos de seguridad e interfaces de red. Para acceder a estos recursos, el rol de ejecución de la función debe tener los siguientes permisos. Estos permisos están incluidos en la política administrada por AWS AWSLambdaMSKExecutionRole.

Permisos de función de Lambda opcionales

Es posible que la función de Lambda también necesite permisos para:

  • Acceda a su secreto de SCRAM, mediante la autenticación SASL/SCRAM.

  • Describir el secreto de Secrets Manager.

  • Acceder a su clave administrada por el cliente de AWS Key Management Service (AWS KMS).

  • Envíe los registros de las invocaciones fallidas a un destino.

Secrets Manager y permisos de AWS KMS

En función del tipo de control de acceso que configure para los agentes de Amazon MSK, es posible que la función de Lambda necesite permiso para acceder a su secreto de SCRAM (si utiliza autenticación SASL/SCRAM) o al secreto de Secrets Manager para descifrar su clave administrada por el cliente de AWS KMS. Para acceder a estos recursos, el rol de ejecución de la función debe tener los siguientes permisos:

Envío de registros a un destino

Si desea enviar registros de invocaciones fallidas a un destino en caso de error, la función de Lambda debe tener permiso para enviar estos registros. Para las asignaciones de orígenes de eventos de Kafka, puede elegir entre un tema de Amazon SNS, una cola de Amazon SQS o un bucket de Amazon S3 como destino. Para enviar registros a un tema de SNS, el rol de ejecución de la función debe tener el siguiente permiso:

Para enviar registros a una cola de SQS, el rol de ejecución de la función debe tener el siguiente permiso:

Para enviar registros a un bucket de S3, el rol de ejecución de la función debe tener los siguientes permisos:

Además, si configuró una clave de KMS en su destino, Lambda necesita los siguientes permisos según el tipo de destino:

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de S3, necesitará kms:GenerateDataKey. Si la clave de KMS y el destino del bucket de S3 están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:GenerateDataKey.

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de SQS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino de la cola de SQS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.

  • Si tiene activado el cifrado con su propia clave de KMS para un destino de SNS, necesitará kms:Decrypt y kms:GenerateDataKey. Si la clave de KMS y el destino del tema de SNS están en una cuenta diferente a la de su función de Lambda y rol de ejecución, configure la clave de KMS para que confíe en el rol de ejecución y permita kms:Decrypt, kms:GenerateDataKey, kms:DescribeKey y kms:ReEncrypt.

Adición de permisos a su rol de ejecución

Siga estos pasos para agregar la política administrada por AWS AWSLambdaMSKExecutionRole a su rol de ejecución mediante la consola de IAM.

Cómo agregar una política administrada de AWS
  1. Abra la página Policies (Políticas) en la consola de IAM.

  2. En el cuadro de búsqueda, escriba el nombre de la política (AWSLambdaMSKExecutionRole).

  3. Seleccione la política de la lista y, a continuación, elija Acciones de política, Adjuntar.

  4. En la página Attach policy (Asociar política), seleccione su rol de ejecución en la lista y, a continuación, elija Attach policy (Asociar política).

Concesión de acceso a los usuarios con una política de IAM

De forma predeterminada, los usuarios y roles no tienen permiso para llevar a cabo operaciones de API de Amazon MSK. Para conceder acceso a los usuarios de su organización o cuenta, puede agregar o actualizar la política basada en identidades. Para obtener más información, consulte Ejemplos de políticas basadas en identidades de Amazon MSK en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

Errores de autenticación y autorización

Si falta alguno de los permisos necesarios para consumir datos del clúster de Amazon MSK, Lambda muestra uno de los siguientes mensajes de error en la asignación del origen de eventos en LastProcessingResult.

Error de autorización del clúster a Lambda

Para SASL/SCRAM o mTLS, este error indica que el usuario proporcionado no tiene todos los permisos de lista de control de acceso (ACL) de Kafka necesarios que se indican a continuación:

  • Clúster DescribeConfigs

  • Descripción del grupo

  • Grupo de lectura

  • Descripción del tema

  • Tema de lectura

Para el control de acceso de IAM, faltan uno o más de los permisos necesarios en el rol de ejecución de la función para acceder al grupo o al tema. Consulte la lista de permisos necesarios en Autenticación basada en roles de IAM.

Cuando crea las ACL de Kafka o una política de IAM con los permisos de clúster de Kafka necesarios, debe especificar el tema y el grupo como recursos. El nombre del tema debe coincidir con el tema en la asignación de origen de eventos. El nombre del grupo debe coincidir con el UUID de la asignación de origen de eventos.

Después de agregar los permisos necesarios al rol de ejecución, pueden pasar varios minutos hasta que los cambios surtan efecto.

Error de autenticación de SASL

Para SASL/SCRAM, este error indica que el nombre de usuario y la contraseña proporcionados no son válidos.

Para el control de acceso de IAM, falta el permiso kafka-cluster:Connect para el clúster de MSK en el rol de ejecución. Agregue este permiso al rol y especifique el nombre de recurso de Amazon (ARN) del clúster como recurso.

Es posible que vea que este error se produce de forma intermitente. El clúster rechaza las conexiones después de que el número de conexiones TCP supere la cuota de servicio de Amazon MSK. Lambda retrocede y vuelve a intentarlo hasta que una conexión tenga éxito. Después de que Lambda se conecte al clúster y sondee los registros, el último resultado de procesamiento cambia a OK.

El servidor no pudo autenticar Lambda

Este error indica que los agentes de Kafka de Amazon MSK no se han podido autenticar con Lambda. Este error puede producirse por cualquiera de las razones siguientes:

  • No proporcionó ningún certificado de cliente para la autenticación de mTLS.

  • Proporcionó un certificado de cliente, pero los agentes no están configurados para utilizar mTLS.

  • Los agentes no confían en el certificado de cliente.

La clave privada o el certificado proporcionados no son válidos

Este error indica que el consumidor de Amazon MSK no ha podido utilizar el certificado ni la clave privada proporcionados. Asegúrese de que el formato del certificado y la clave sea PEM y de que el cifrado de clave privada utilice un algoritmo PBES1.

Configuración de red

Para que Lambda utilice su clúster de Kafka como fuente de eventos, necesita acceder a la Amazon VCP en la que reside su clúster. Le recomendamos que implemente puntos de conexión de VPC AWS PrivateLink para que Lambda acceda a su VPC. Implemente puntos de conexión para Lambda y AWS Security Token Service (AWS STS). Si el agente utiliza autenticación, implemente también un punto de conexión de VPC para Secrets Manager. Si configuró un destino en caso de error, implemente también un punto de conexión de VPC para el servicio de destino.

De manera alternativa, asegúrese de que la VPC asociada a su clúster de Kafka incluya una gateway NAT por subred pública. Para obtener más información, consulte Habilitar el acceso a Internet para funciones de Lambda conectadas a VPC.

Si usa puntos de enlace de VPC, también debe configurarlos para habilitar los nombres DNS privados.

Al crear una asignación de orígenes de eventos para un clúster de MSK, Lambda comprueba si las interfaces de red elásticas (ENI) ya están presentes en las subredes y los grupos de seguridad de su VPC del clúster. Si Lambda encuentra los ENI existentes, intenta reutilizarlos. De lo contrario, Lambda crea nuevos ENI para conectarse a la fuente del evento e invocar la función.

nota

Las funciones de Lambda siempre se ejecutan dentro de VPC propiedad del servicio de Lambda. El servicio mantiene estas VPC de forma automática y no son visibles para los clientes. También puede conectar su función a una Amazon VPC. En cualquier caso, la configuración de VPC de la función no afecta a la asignación de orígenes de eventos. Solo la configuración de la VPC de la fuente de eventos determina cómo Lambda se conecta a la fuente de eventos.

La configuración de Amazon VPC se puede detectar a través de la API de Amazon MSK. No tiene que configurarla durante la configuración con el comando create-event-source-mapping.

Para obtener más información sobre la configuración de la red, consulte Configuración de AWS Lambda con un clúster Apache Kafka dentro de una VPC en el Blog de informática de AWS.

Reglas de los grupos de seguridad de VPC

Configure sus grupos de seguridad de Amazon VPC que contienen su clúster con las siguientes reglas (como mínimo):

  • Reglas de entrada: permiten todo el tráfico en el puerto del agente de Amazon MSK (9092 para texto sin formato, 9094 para TLS, 9096 para SASL, 9098 para IAM) para los grupos de seguridad especificados para el origen de eventos.

  • Reglas de salida: permiten todo el tráfico en el puerto 443 para todos los destinos. Permiten todo el tráfico en el puerto del agente de Amazon MSK (9092 para texto sin formato, 9094 para TLS, 9096 para SASL, 9098 para IAM) para los grupos de seguridad especificados para el origen de eventos.

  • Si utiliza puntos de conexión de VPC en lugar de una puerta de enlace NAT, los grupos de seguridad asociados a los puntos de conexión de VPC deben permitir todo el tráfico entrante en el puerto 443 desde los grupos de seguridad del origen de eventos.

Uso de los puntos de enlace de la VPC

Cuando utiliza puntos de conexión de VPC, las llamadas a la API para invocar su función se enrutan a través de estos puntos de conexión mediante los ENI. La entidad principal del servicio de Lambda debe llamar a sts:AssumeRole y lambda:InvokeFunction para cualquier rol y función que utilice esos ENI.

De forma predeterminada, los puntos de conexión de VPC tienen políticas de IAM abiertas. La práctica recomendada es restringir estas políticas para permitir que solo entidades principales específicas realicen las acciones necesarias utilizando ese punto de conexión. Para garantizar que la asignación de orígenes de eventos pueda invocar la función de Lambda, la política de puntos de conexión de VPC debe permitir que el principio del servicio de Lambda llame a sts:AssumeRole y lambda:InvokeFunction. Restringir las políticas de puntos de conexión de VPC para permitir únicamente las llamadas a la API que se originen en su organización impide que la asignación de orígenes de eventos funcione correctamente.

En el siguiente ejemplo de políticas de puntos de conexión de VPC, se muestra cómo conceder el acceso necesario a las entidades principales del servicio de Lambda para AWS STS y los puntos de conexión de Lambda.

ejemplo Política del punto de conexión de VPC: punto de conexión de AWS STS
{ "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
ejemplo Política del punto de conexión de VPC: punto de conexión de Lambda
{ "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }

Si su agente de Kafka utiliza la autenticación, también puede restringir la política de puntos de conexión de VPC para el punto de conexión de Secrets Manager. Para llamar a la API de Secrets Manager, Lambda usa su rol de función, no la entidad principal de servicio de Lambda. En el siguiente ejemplo, se muestra una política del punto de conexión de Secrets Manager.

ejemplo Política de punto de conexión de VPC: punto de conexión de Secrets Manager
{ "Statement": [ { "Action": "secretsmanager:GetSecretValue", "Effect": "Allow", "Principal": { "AWS": [ "customer_function_execution_role_arn" ] }, "Resource": "customer_secret_arn" } ] }

Si tiene configurado un destino en caso de fallo, Lambda también utiliza el rol de su función para llamar a s3:PutObject, sns:Publish o sqs:sendMessage mediante los ENI administrados por Lambda.

Agregar Amazon MSK como fuente de eventos

Para crear una asignación de orígenes de eventos, agregue Amazon MSK como un desencadenador de la función de Lambda a través de la consola de Lambda, un AWS SDK, o el AWS Command Line Interface (AWS CLI). Tenga en cuenta que cuando agrega Amazon MSK como desencadenador, Lambda asume la configuración de VPC del clúster de Amazon MSK, no la configuración de VPC de la función de Lambda.

En esta sección se describe cómo crear una asignación de orígenes de eventos mediante la consola de Lambda y la AWS CLI.

Requisitos previos

  • Un clúster Amazon MSK y un tema Kafka. Para obtener más información, consulte Introducción al uso de Amazon MSK en la Guía para desarrolladores de Amazon Managed Streaming for Apache Kafka.

  • Un rol de ejecución con permiso para acceder a los recursos de AWS que utiliza el clúster de MSK.

ID del grupo de consumidores personalizable

Al configurar Kafka como origen de eventos, puede especificar un ID de grupo de consumidores. Este ID de grupo de consumidores es un identificador existente para el grupo de consumidores de Kafka al que desea que se una la función de Lambda. Puede utilizar esta característica para migrar sin problemas cualquier configuración de procesamiento de registro de Kafka en curso de otros consumidores a Lambda.

Si especifica un ID de grupo de consumidores y hay otros sondeadores activos dentro de ese grupo de consumidores, Kafka distribuirá los mensajes entre todos los consumidores. En otras palabras, Lambda no recibe todos los mensajes del tema Kafka. Si desea que Lambda gestione todos los mensajes del tema, desactive los demás sondeadores de ese grupo de consumidores.

Además, si especifica un ID de grupo de consumidores y Kafka encuentra un grupo de consumidores existente válido con el mismo ID, Lambda ignora el parámetro StartingPosition para la asignación de orígenes de eventos. En cambio, Lambda comienza a procesar los registros de acuerdo con la compensación comprometida del grupo de consumidores. Si especifica un ID de grupo de consumidores y Kafka no puede encontrar un grupo de consumidores existente, Lambda configura el origen de eventos con el StartingPosition especificado.

El ID del grupo de consumidores que especifique debe ser único entre todos los orígenes de eventos de Kafka. Tras crear una asignación de orígenes de eventos de Kafka con el ID de grupo de consumidores especificado, no puede actualizar este valor.

Destinos en caso de error

Para retener los registros de las invocaciones fallidas o de las cargas útiles sobredimensionadas de su origen de eventos de Kafka, configure un destino en caso de error para su función. Cuando se produce un error en una invocación, Lambda envía un registro JSON que contiene detalles de la invocación a su destino.

Puede elegir entre un tema de Amazon SNS, una cola de Amazon SQS o un bucket de Amazon S3 como su destino. Para los destinos de temas de SNS o colas de SQS, Lambda envía los metadatos del registro al destino. Para buckets de S3 de destino, Lambda envía todo el registro de invocación junto con los metadatos al destino.

Para que Lambda envíe correctamente los registros al destino elegido, asegúrese de que el rol de ejecución de la función contenga los permisos pertinentes. En la tabla también se describe cómo cada tipo de destino recibe el registro de invocación de JSON.

Tipo de destino Compatible con los siguientes orígenes de eventos Permisos necesarios Formato JSON específico del destino

Cola de Amazon SQS

  • Kinesis

  • DynamoDB

  • Apache Kafka autoadministrado y Apache Kafka administrado

Lambda pasa los metadatos del registro de invocación como Message al destino.

Tema de Amazon SNS

  • Kinesis

  • DynamoDB

  • Apache Kafka autoadministrado y Apache Kafka administrado

Lambda pasa los metadatos del registro de invocación como Message al destino.

Bucket de Amazon S3

  • Apache Kafka autoadministrado y Apache Kafka administrado

Lambda almacena el registro de invocación junto con sus metadatos en el destino.

sugerencia

Como práctica recomendada, incluya los permisos mínimos requeridos solo en su rol de ejecución.

Destinos SNS y SQS

El siguiente ejemplo muestra lo que Lambda envía a un destino de tema de SNS o cola de SQS cuando se produce un error en la invocación de un origen de eventos de Kafka. Cada una de las claves en recordsInfo contiene el tema y la partición de Kafka, separados por un guion. Por ejemplo, para la clave "Topic-0", Topic es el tema de Kafka, y 0 es la partición. Para cada tema y partición, puede usar los desplazamientos y los datos de las marcas de tiempo para buscar los registros de invocación originales.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } } }

Destinos de S3

Para los destinos de S3, Lambda envía todo el registro de invocación junto con los metadatos al destino. El siguiente ejemplo muestra lo que Lambda envía a un bucket de S3 de destino cuando se produce un error en la invocación de un origen de evento de Kafka. Además de todos los campos del ejemplo anterior para los destinos de SQS y SNS, el campo payload contiene el registro de invocación original en forma de cadena JSON de escape.

{ "requestContext": { "requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81", "functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction", "condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded", "approximateInvokeCount": 1 }, "responseContext": { // null if record is MaximumPayloadSizeExceeded "statusCode": 200, "executedVersion": "$LATEST", "functionError": "Unhandled" }, "version": "1.0", "timestamp": "2019-11-14T00:38:06.021Z", "KafkaBatchInfo": { "batchSize": 500, "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2", "bootstrapServers": "...", "payloadSize": 2039086, // In bytes "recordsInfo": { "Topic-0": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", }, "Topic-1": { "firstRecordOffset": "49601189658422359378836298521827638475320189012309704722", "lastRecordOffset": "49601189658422359378836298522902373528957594348623495186", "firstRecordTimestamp": "2019-11-14T00:38:04.835Z", "lastRecordTimestamp": "2019-11-14T00:38:05.580Z", } } }, "payload": "<Whole Event>" // Only available in S3 }
sugerencia

También se recomienda habilitar el control de versiones de S3 en el bucket de destino.

Configuración de destinos en caso de error

Para configurar un destino en caso de error mediante la consola, siga estos pasos:

  1. Abra la página de Funciones en la consola de Lambda.

  2. Elija una función.

  3. En Descripción general de la función, elija Agregar destino.

  4. En Origen, elija Invocación de asignación de orígenes de eventos.

  5. Para la Asignación de orígenes de eventos, elija un origen de eventos que esté configurado para esta función.

  6. En Condición, seleccione En caso de error. Para las invocaciones de asignación de orígenes de eventos, esta es la única condición aceptada.

  7. En Tipo de destino, elija el tipo de destino al que Lambda envía los registros de invocación.

  8. En Destino, elija un recurso.

  9. Elija Guardar.

También puede configurar un destino en caso de error mediante la API de Lambda. Por ejemplo, el siguiente comando de la CLI CreateEventSourceMapping agrega un destino de SQS en caso de error a MyFunction:

aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'

El siguiente comando de la CLI UpdateEventSourceMapping agrega un destino S3 en caso de error al origen de eventos de Kafka asociado a la entrada uuid:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'

Para eliminar un destino, introduzca una cadena vacía como argumento del parámetro destination-config:

aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'

Agregar un desencadenador de Amazon MSK (consola)

Siga estos pasos para agregar su clúster de Amazon MSK y un tema Kafka como desencadenador de su función de la función de Lambda.

Cómo agregar un desencadenador Amazon MSK a su función de Lambda (consola)
  1. Abra la página de Functions (Funciones) en la consola de Lambda.

  2. Elija el nombre de su función de Lambda.

  3. En Descripción general de la función, elija Agregar desencadenador.

  4. En Configuración del desencadenador, haga lo siguiente:

    1. Elija el tipo de desencadenador MSK.

    2. Para clúster de MSK, seleccione su clúster.

    3. Para el Tamaño del lote, establezca el número máximo de mensajes para recibir un solo lote.

    4. Para el periodo de lotes, ingrese la cantidad máxima de segundos que Lambda emplea a fin de recopilar registros antes de invocar la función.

    5. Para Nombre de tema, escriba un nombre para el tema Kafka.

    6. (Opcional) Para el ID del grupo de consumidores, ingrese el ID de un grupo de consumidores de Kafka al que unirse.

    7. (Opcional) En Posición inicial, elija Última para empezar a leer el flujo desde el registro más reciente, Horizonte de supresión para comenzar por el registro más antiguo disponible o En la marca de tiempo para especificar una marca de tiempo desde la cual comenzar a leer.

    8. (Opcional) En Authentication (Autenticación), elija la clave secreta para la autenticación con los agentes en el clúster de MSK.

    9. Para crear el desencadenador en un estado deshabilitado para la prueba (recomendado), desactive Activar desencadenador. O bien, para habilitar el desencadenador de inmediato, seleccioneActivar desencadenador.

  5. Para crear el desencadenador, elija Add (Añadir).

Agregar un desencadenador de Amazon MSK (AWS CLI)

Utilice los siguientes comandos AWS CLI de ejemplo para crear y ver un desencadenador Amazon MSK para su función de a función Lambda.

Crear un desencadenador mediante el AWS CLI

ejemplo — Creación de una asignación de orígenes de eventos para un clúster que utilice la autenticación de IAM

El siguiente ejemplo utiliza el comando create-event-source-mapping AWS CLI para mapear una función de Lambda llamada my-kafka-function a un tema de Kafka llamado AWSKafkaTopic. La posición inicial del tema está establecida en LATEST. Cuando el clúster utiliza la autenticación basada en roles de IAM, no se necesita un objeto SourceAccessConfiguration. Ejemplo:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
ejemplo — Creación de una asignación de orígenes de eventos para un clúster que utilice la autenticación SASL/SCRAM

Si el clúster usa la autenticación SASL/SCRAM, debe incluir un objeto SourceAccessConfiguration que especifique SASL_SCRAM_512_AUTH y un ARN secreto de Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
ejemplo — Creación de una asignación de orígenes de eventos para un clúster que utilice la autenticación mTLS

Si el clúster usa la autenticación mTLS, debe incluir un objeto SourceAccessConfiguration que especifique CLIENT_CERTIFICATE_TLS_AUTH y un ARN secreto de Secrets Manager.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

Para obtener más información, consulte la documentación de referencia de la API CreateEventSourceMapping.

Visualización del estado mediante la AWS CLI

El siguiente ejemplo utiliza el comando get-event-source-mapping AWS CLI para describir el estado de la asignación de orígenes de eventos que ha creado.

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Creación de asignaciones de orígenes de eventos entre cuentas

Puede utilizar la conectividad privada de varias VPC para conectar una función de Lambda a un clúster de MSK aprovisionado en otra Cuenta de AWS. La conectividad de varias VPC utiliza AWS PrivateLink, que mantiene todo el tráfico dentro de la red de AWS.

nota

No puede crear asignaciones de orígenes de eventos entre cuentas para clústeres de MSK sin servidor.

Para crear una asignación de orígenes de eventos entre cuentas, primero debe configurar la conectividad de múltiples VPC para el clúster de MSK. Al crear la asignación de orígenes de eventos, utilice el ARN de conexión de VPC administrada en lugar del ARN del clúster, como se muestra en los siguientes ejemplos. La operación CreateEventSourceMapping también varía según el tipo de autenticación que utilice el clúster de MSK.

ejemplo — Creación de una asignación de orígenes de eventos entre cuentas para un clúster que utilice la autenticación de IAM

Cuando el clúster utiliza la autenticación basada en roles de IAM, no se necesita un objeto SourceAccessConfiguration. Ejemplo:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
ejemplo — Creación de una asignación de orígenes de eventos entre cuentas para un clúster que utilice la autenticación SASL/SCRAM

Si el clúster usa la autenticación SASL/SCRAM, debe incluir un objeto SourceAccessConfiguration que especifique SASL_SCRAM_512_AUTH y un ARN secreto de Secrets Manager.

Hay dos formas de utilizar los secretos para las asignaciones de orígenes de eventos entre cuentas de Amazon MSK con autenticación SASL/SCRAM:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
ejemplo — Creación de una asignación de orígenes de eventos entre cuentas para un clúster que utilice la autenticación mTLS

Si el clúster usa la autenticación mTLS, debe incluir un objeto SourceAccessConfiguration que especifique CLIENT_CERTIFICATE_TLS_AUTH y un ARN secreto de Secrets Manager. El secreto se puede almacenar en la cuenta del clúster o en la cuenta de la función de Lambda.

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

Escalado automático de la fuente de eventos de Amazon MSK

Al crear inicialmente un origen de eventos de Amazon MSK, Lambda asigna un consumidor para procesar todas las particiones del tema de Kafka. Cada consumidor tiene varios procesadores que se ejecutan en paralelo para gestionar el aumento de las cargas de trabajo. Además, Lambda escala o reduce verticalmente de manera automática el número de consumidores, en función de la carga de trabajo. Para conservar el orden de mensajes en cada partición, el número máximo de consumidores es un consumidor por partición en el tema.

En intervalos de un minuto, Lambda evalúa el retraso de compensación del consumidor de todas las particiones del tema. Si el retraso es demasiado alto, la partición recibe mensajes más rápido de lo que Lambda puede procesarlos. Si es necesario, Lambda agrega o elimina a los consumidores del tema. El proceso de escalado para agregar o eliminar consumidores se produce dentro de los tres minutos posteriores a la evaluación.

Si su función de Lambda objetivo está limitada, Lambda reduce el número de consumidores. Esta acción reduce la carga de trabajo de la función al reducir el número de mensajes que los consumidores pueden recuperar y enviar a la función.

Para monitorear el rendimiento de su tema de Kafka, consulte la métrica de retraso de desplazamiento que Lambda emite mientras la función procesa los registros.

Para comprobar cuántas invocaciones de función se producen en paralelo, también puede supervisar las métricas de simultaneidad para su función.

Posiciones iniciales de flujos y sondeo

Tenga en cuenta que el sondeo de flujos durante la creación y las actualizaciones de la asignación de orígenes de eventos es, en última instancia, coherente.

  • Durante la creación de la asignación de orígenes de eventos, es posible que se demore varios minutos en iniciar el sondeo de los eventos del flujo.

  • Durante las actualizaciones de la asignación de orígenes de eventos, es posible que se demore varios minutos en detener y reiniciar el sondeo de los eventos del flujo.

Este comportamiento significa que, si especifica LATEST como posición inicial del flujo, la asignación de orígenes de eventos podría omitir eventos durante la creación o las actualizaciones. Para garantizar que no se pierda ningún evento, especifique la posición inicial del flujo como TRIM_HORIZON o AT_TIMESTAMP.

Métricas de Amazon CloudWatch

Lambda emite la métrica OffsetLag mientras la función procesa los registros. El valor de esta métrica es la diferencia de compensación entre el último registro escrito en el tema de origen de eventos de Kafka y el último registro que procesó el grupo de consumidores de su función. Puede utilizar OffsetLag para estimar la latencia entre el momento en el que se agrega un registro y el momento en el que el grupo lo procesa.

Una tendencia ascendente en OffsetLag puede indicar problemas con los sondeadores en el grupo de consumidores de su función. Para obtener más información, consulte Uso de métricas de funciones de Lambda.

Parámetros de configuración de Amazon MSK

Todos los tipos de fuente de eventos Lambda comparten las mismas operaciones CreateEventSourceMapping y UpdateEventSourceMapping de la API. Sin embargo, solo algunos de los parámetros se aplican a Amazon MSK.

Parámetros de fuente de eventos que se aplican a Amazon MSK
Parámetro Obligatoria Predeterminado Notas

AmazonManagedKafkaEventSourceConfig

N

Contiene el campo ConsumerGroupId, que se establece de forma predeterminada en un valor único.

Solo se puede establecer en Crear

BatchSize

N

100

Máximo: 10 000

Habilitado

N

Habilitado

EventSourceArn

Y

Solo se puede establecer en Crear

FunctionName

Y

FilterCriteria

N

Filtrado de eventos de Lambda

MaximumBatchingWindowInSeconds

N

500 ms

Comportamiento de procesamiento por lotes

SourceAccessConfigurations

N

Sin credenciales

Credenciales de autenticación de SASL/SCRAM o CLIENT_CERTIFICATE_TLS_AUTH (MutualTLS) para el origen de eventos

StartingPosition

Y

AT_TIMESTAMP, TRIM_HORIZON o LATEST

Solo se puede establecer en Crear

StartingPositionTimestamp

N

Obligatorio si StartingPosition se establece en AT_TIMESTAMP

Temas

Y

Nombre del tema de Kafka

Solo se puede establecer en Crear