Tutorial: Uso de una asignación de orígenes de eventos de Amazon MSK para invocar una función de Lambda - AWS Lambda

Tutorial: Uso de una asignación de orígenes de eventos de Amazon MSK para invocar una función de Lambda

En este tutorial, hará lo siguiente:

  • Creará una función de Lambda en la misma cuenta de AWS de un clúster existente de Amazon MSK.

  • Configurará las redes y la autenticación para que Lambda se comunique con Amazon MSK.

  • Configurará una asignación de orígenes de eventos de Amazon MSK de Lambda que ejecute la función de Lambda cuando los eventos aparezcan en el tema.

Una vez finalizados estos pasos, cuando los eventos se envíen a Amazon MSK, podrá configurar una función de Lambda para procesar dichos eventos automáticamente con su propio código de Lambda personalizado.

¿Qué puede hacer con esta característica?

Ejemplo de solución: utilice una asignación de orígenes de eventos de MSK para ofrecer puntuaciones en directo a sus clientes.

Considere el siguiente escenario: su empresa aloja una aplicación web en la que sus clientes pueden ver información sobre eventos en directo, como partidos deportivos. Las actualizaciones de información del juego se proporcionan a su equipo a través de un tema de Kafka en Amazon MSK. Desea diseñar una solución que utilice las actualizaciones del tema de MSK para ofrecer a los clientes una visión actualizada del evento en directo desde una aplicación que desarrolle. Ha optado por el siguiente enfoque de diseño: sus aplicaciones cliente se comunicarán con un backend sin servidor alojado en AWS. Los clientes se conectarán a través de sesiones de websocket mediante la API de WebSocket de Amazon API Gateway.

En esta solución, necesita un componente que lea los eventos de MSK, ejecute una lógica personalizada para preparar esos eventos para la capa de aplicación y, a continuación, reenvíe esa información a la API de API Gateway. Puede implementar este componente con AWS Lambda al proporcionar su lógica personalizada en una función de Lambda y luego llamarla con una asignación de orígenes de eventos de Amazon MSK de AWS Lambda.

Para obtener más información sobre la implementación de soluciones mediante la API de WebSocket de Amazon API Gateway, consulte los tutoriales de la API de WebSocket en la documentación de API Gateway.

Requisitos previos

Una cuenta de AWS con los siguientes recursos preconfigurados:

Para cumplir estos requisitos previos, le recomendamos que consulte la sección Getting started using Amazon MSK en la documentación de Amazon MSK.

  • Un clúster de Amazon MSK Consulte Create an Amazon MSK cluster en Getting started using Amazon MSK.

  • La siguiente configuración:

    • Asegúrese de que la autenticación basada en roles de IAM esté habilitada en la configuración de seguridad de su clúster. Esto mejora la seguridad al limitar la función de Lambda para que solo acceda a los recursos de Amazon MSK necesarios. Esto está habilitado de forma predeterminada en los nuevos clústeres de Amazon MSK.

    • Asegúrese de que el acceso público esté desactivado en la configuración de red del clúster. Restringir el acceso de su clúster de Amazon MSK a Internet mejora su seguridad al limitar el número de intermediarios que gestionan sus datos. Esto está habilitado de forma predeterminada en los nuevos clústeres de Amazon MSK.

  • Un tema de Kafka en su clúster de Amazon MSK para usarlo en esta solución. Consulte Create a topic en Getting started using Amazon MSK.

  • Un host de administración de Kafka configurado para recuperar información de su clúster de Kafka y enviar los eventos de Kafka a su tema para probarlos, como una instancia de Amazon EC2 con la CLI de administración de Kafka y la biblioteca de IAM de Amazon MSK instaladas. Consulte Create an Amazon MSK cluster en Getting started using Amazon MSK.

Una vez que haya configurado estos recursos, recopile la siguiente información de su cuenta de AWS para confirmar que puede continuar.

  • El nombre de su clúster de Amazon MSK. Puede encontrar esta información en la consola de Amazon MSK.

  • El UUID del clúster, que forma parte del ARN de su clúster de Amazon MSK, que puede encontrar en la consola de Amazon MSK. Siga los procedimientos que se indican en Listing clusters en la documentación de Amazon MSK para encontrar esta información.

  • Los grupos de seguridad asociados con su clúster de Amazon MSK. Puede encontrar esta información en la consola de Amazon MSK. En los siguientes pasos, se hará referencia a ellos como clusterSecurityGroups.

  • El identificador de la Amazon VPC que contiene su clúster de Amazon MSK. Para encontrar esta información, identifique las subredes asociadas a su clúster de Amazon MSK en la consola de Amazon MSK y, a continuación, identifique la Amazon VPC asociada a la subred en la consola de Amazon VPC.

  • El nombre del tema de Kafka utilizado en su solución. Puede encontrar esta información al llamar a su clúster de Amazon MSK con la CLI de topics de Kafka desde su host de administración de Kafka. Para obtener más información sobre la CLI de temas, consulte Adding and removing topics en la documentación de Kafka.

  • El nombre de un grupo de consumidores para su tema de Kafka, adecuado para que lo utilice su función de Lambda. Lambda puede crear este grupo automáticamente, por lo que no es necesario crearlo con la CLI de Kafka. Si necesita administrar sus grupos de consumidores, para obtener más información sobre la CLI de grupos de consumidores, consulte Managing Consumer Groups en la documentación de Kafka.

Los siguientes permisos en su cuenta de AWS:

  • Permiso para crear y administrar una función de Lambda

  • Permiso para crear políticas de IAM y asociarlas a su función de Lambda.

  • Permiso para crear puntos de conexión de Amazon VPC y modificar la configuración de red en la Amazon VPC que aloja el clúster de Amazon MSK.

Configuración de la conectividad de red para que Lambda se comunique con Amazon MSK

Use AWS PrivateLink para conectar Lambda y Amazon MSK. Para ello, puede crear puntos de conexión de Amazon VPC de interfaz en la consola de Amazon VPC. Para obtener más información acerca de la configuración de red, consulte Configuración de red.

Cuando se ejecuta una asignación de orígenes de eventos de Amazon MSK en nombre de una función de Lambda, asume el rol de ejecución de la función de Lambda. Este rol de IAM autoriza la asignación para acceder a los recursos protegidos por IAM, como su clúster de Amazon MSK. Si bien los componentes comparten un rol de ejecución, la asignación de Amazon MSK y la función de Lambda tienen requisitos de conectividad distintos para sus respectivas tareas, como se muestra en el siguiente diagrama.

Una función Lambda sondea un clúster y se comunica con Lambda mediante AWS STS.

La asignación de orígenes de eventos pertenece al grupo de seguridad de su clúster de Amazon MSK. En este paso de red, cree puntos de conexión de Amazon VPC a partir de la VPC de su clúster de Amazon MSK para conectar la asignación de orígenes de eventos a los servicios Lambda y STS. Proteja estos puntos de conexión para que acepten el tráfico del grupo de seguridad de su clúster de Amazon MSK. A continuación, ajuste los grupos de seguridad del clúster de Amazon MSK para permitir que la asignación de orígenes de eventos se comunique con el clúster de Amazon MSK.

Puede configurar los siguientes pasos que mediante la AWS Management Console.

Cómo configurar los puntos de conexión de Amazon VPC de interfaz para conectar Lambda y Amazon MSK
  1. Cree un grupo de seguridad para los puntos de conexión de Amazon VPC de interfaz, endpointSecurityGroup, que permita el tráfico TCP entrante en 443 desde clusterSecurityGroups. Siga el procedimiento descrito en Crear un grupo de seguridad en la documentación de Amazon EC2 para crear un grupo de seguridad. A continuación, siga el procedimiento descrito en Agregar reglas a un grupo de seguridad en la documentación de Amazon EC2 para agregar las reglas adecuadas.

    Cree un grupo de seguridad con la siguiente información:

    Al agregar las reglas de entrada, cree una regla para cada grupo de seguridad de clusterSecurityGroups. Para cada regla:

    • En Tipo, seleccione HTTPS.

    • En Origen, seleccione uno de los clusterSecurityGroups.

  2. Cree un punto de conexión que conecte el servicio de Lambda a la Amazon VPC que contiene su clúster de Amazon MSK. Siga el procedimiento que se indica en Crear un punto de conexión de interfaz.

    Cree un punto de conexión de interfaz con la siguiente información:

    • En Nombre del servicio, seleccione com.amazonaws.regionName.lambda, cuyo RegionName sea donde se aloja la función de Lambda.

    • Para VPC, seleccione la Amazon VPC que contiene el clúster de Amazon MSK.

    • Para Grupos de seguridad, seleccione endpointSecurityGroup, el cual creó anteriormente.

    • En Subredes, seleccione las subredes que alojan su clúster de Amazon MSK.

    • En Política, proporcione el siguiente documento de política, que protege el punto de conexión para que lo utilice la entidad principal del servicio de Lambda para la acción lambda:InvokeFunction.

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • Asegúrese de que Habilitar nombre de DNS permanezca establecido.

  3. Cree un punto de conexión que conecte el servicio AWS STS a la Amazon VPC que contiene su clúster de Amazon MSK. Siga el procedimiento que se indica en Crear un punto de conexión de interfaz.

    Cree un punto de conexión de interfaz con la siguiente información:

    • En Nombre de servicio, seleccione AWS STS.

    • Para VPC, seleccione la Amazon VPC que contiene el clúster de Amazon MSK.

    • Para Grupos de seguridad, seleccione endpointSecurityGroup.

    • En Subredes, seleccione las subredes que alojan su clúster de Amazon MSK.

    • En Política, proporcione el siguiente documento de política, que protege el punto de conexión para que lo utilice la entidad principal del servicio de Lambda para la acción sts:AssumeRole.

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • Asegúrese de que Habilitar nombre de DNS permanezca establecido.

  4. Para cada grupo de seguridad asociado a su clúster de Amazon MSK, es decir, en clusterSecurityGroups, permita lo siguiente:

    • Permita que todo el tráfico TCP entrante y saliente de 9098 llegue a todos los clusterSecurityGroups, incluso dentro del propio grupo.

    • Permita todo el tráfico TCP saliente en 443.

    Las reglas predeterminadas de los grupos de seguridad permiten parte de este tráfico, por lo que si el clúster está conectado a un único grupo de seguridad y ese grupo tiene reglas predeterminadas, no se necesitan reglas adicionales. Para ajustar las reglas de los grupos de seguridad, siga los procedimientos de Agregar reglas a un grupo de seguridad en la documentación de Amazon EC2.

    Agregue reglas a sus grupos de seguridad con la siguiente información:

    • Para cada regla de entrada o de salida del puerto 9098, proporcione lo siguiente:

      • En Type (Tipo), seleccione Custom TCP (TCP personalizada).

      • Para Intervalo de puertos, proporcione 9098.

      • En Origen, proporcione uno de los clusterSecurityGroups.

    • Para cada regla de entrada del puerto 443, en Tipo, seleccione HTTPS.

Creación de un rol de IAM para que Lambda lea un tema de Amazon MSK

Identifique los requisitos de autenticación que Lambda debe leer en su tema de Amazon MSK y, a continuación, defínalos en una política. Cree un rol, lambdaAuthRole, que autorice a Lambda a usar esos permisos. Autorice acciones en su clúster de Amazon MSK mediante las acciones de IAM kafka-cluster. A continuación, autorice a Lambda a realizar las acciones de Amazon MSK kafka y Amazon EC2 necesarias para descubrir el clúster de Amazon MSK y conectarse a él, así como las acciones de CloudWatch para que Lambda pueda registrar lo que ha hecho.

Cómo describir los requisitos de autenticación para que Lambda lea desde Amazon MSK
  1. Redacte un documento de política de IAM (un documento JSON), clusterAuthPolicy, que permita a Lambda leer el tema de Kafka en su clúster de Amazon MSK mediante su grupo de consumidores de Kafka. Lambda requiere que se configure un grupo de consumidores de Kafka para la lectura.

    Modifique la siguiente plantilla para adaptarla a sus requisitos previos:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    Para obtener más información, consulte Autenticación basada en roles de IAM. Al redactar la política:

    • Para la región y el identificador de cuenta, proporcione los que alojan su clúster de Amazon MSK.

    • Para mskClusterName, proporcione el nombre de su clúster de Amazon MSK.

    • Para cluster-uuid, proporcione el UUID del ARN de su clúster de Amazon MSK.

    • Para mskTopicName, proporcione el nombre del tema de Kafka.

    • Para mskGroupName, proporcione el nombre de su grupo de consumidores de Kafka.

  2. Identifique los permisos de Amazon MSK, Amazon EC2 y CloudWatch necesarios para que Lambda detecte y conecte su clúster de Amazon MSK a fin de registrar esos eventos.

    La política administrada AWSLambdaMSKExecutionRole define de forma permisiva los permisos necesarios. Úsela en los siguientes pasos.

    En un entorno de producción, evalúe AWSLambdaMSKExecutionRole para restringir su política de rol de ejecución según el principio de privilegio mínimo y luego escriba una política para su rol que reemplace esta política administrada.

Para obtener más información sobre el lenguaje de políticas de IAM, consulte la documentación de IAM.

Ahora que ha redactado su documento de política, cree una política de IAM para poder adjuntarla a su rol. Para ello, puede utilizar la consola de la siguiente manera.

Cómo crear una política de IAM a partir de su documento de política
  1. Inicie sesión en AWS Management Console Management Console y abra la consola IAM en https://console.aws.amazon.com/iam/.

  2. En el panel de navegación de la izquierda, elija Políticas.

  3. Elija Crear política.

  4. En la sección Editor de políticas, seleccione la opción JSON.

  5. Pegue clusterAuthPolicy.

  6. Cuando haya terminado de agregar permisos a la política, seleccione Siguiente.

  7. En la página Revisar y crear, escriba el Nombre de la política y la Descripción (opcional) para la política que está creando. Revise los Permisos definidos en esta política para ver los permisos que concede la política.

  8. Elija Crear política para guardar la nueva política.

Para obtener más información, consulte Crear políticas de IAM en la documentación de IAM.

Ahora que dispone de las políticas de IAM adecuadas, cree un rol y adjúnteselas. Para ello, puede utilizar la consola de la siguiente manera.

Para crear un rol de ejecución en la consola de IAM
  1. Abra la página Roles en la consola de IAM.

  2. Elija Crear rol.

  3. En Tipo de entidad de confianza, seleccione Servicio de AWS.

  4. En Use case (Caso de uso), elija Lambda.

  5. Elija Siguiente.

  6. Seleccione las siguientes políticas:

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. Elija Siguiente.

  8. En Nombre de rol, escriba lambdaAuthRole y luego elija Crear rol.

Para obtener más información, consulte Definición de permisos de funciones de Lambda con un rol de ejecución.

Creación de una función de Lambda para leer su tema de Amazon MSK

Cree una función de Lambda configurada para usar su rol de IAM. Puede crear la función de Lambda con la consola.

Cómo crear una función de Lambda con su configuración de autenticación
  1. Abra la consola de Lambda y seleccione Crear función en el encabezado.

  2. Seleccione Crear desde cero.

  3. En Nombre de la función, introduzca el nombre apropiado de su elección.

  4. En Tiempo de ejecución, elija la última versión compatible de Node.js para usar el código que se proporciona en este tutorial.

  5. Elija Cambiar el rol de ejecución predeterminado.

  6. Seleccione Usar un rol existente.

  7. En Rol existente, seleccione lambdaAuthRole.

En un entorno de producción, normalmente es necesario agregar más políticas al rol de ejecución de la función de Lambda para procesar de forma significativa los eventos de Amazon MSK. Para obtener más información sobre cómo agregar políticas a su rol, consulte Agregar o eliminar permisos de identidad en la documentación de IAM.

Creación de una asignación de orígenes de eventos para la función de Lambda

La asignación de orígenes de eventos de Amazon MSK proporciona al servicio Lambda la información necesaria para invocar una función de Lambda cuando se producen los eventos de Amazon MSK adecuados. Puede crear una asignación de Amazon MSK mediante la consola. Cree un desencadenador de Lambda y, a continuación, la asignación de orígenes de eventos se configurará automáticamente.

Cómo crear un desencadenador de Lambda (y la asignación de orígenes de eventos)
  1. Navegue a la página de información general de la función de Lambda.

  2. En la sección de información general de la función, seleccione Agregar desencadenador en la parte inferior izquierda.

  3. En el menú desplegable Seleccionar un origen, seleccione Amazon MSK.

  4. No configure la autenticación.

  5. Para Clúster de MSK, seleccione el nombre de su clúster.

  6. En Tamaño del lote, ingrese 1. Este paso hace que esta característica sea más fácil de probar, pero no es un valor ideal en producción.

  7. Para Nombre del tema, escriba un nombre para el tema de Kafka.

  8. Para ID de grupo de consumidores, indique el identificador de su grupo de consumidores de Kafka.

Actualización de la función de Lambda para leer los datos de transmisión

Lambda proporciona información sobre los eventos de Kafka a través del parámetro del método event. Para ver un ejemplo de estructura de un evento de Amazon MSK, consulte Evento de ejemplo. Una vez que sepa cómo interpretar los eventos de Amazon MSK reenviados por Lambda, puede modificar el código de la función de Lambda para utilizar la información que proporcionan.

Proporcione el siguiente código a la función de Lambda para registrar el contenido de un evento de Amazon MSK de Lambda con fines de prueba:

.NET
AWS SDK for .NET
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de Amazon MSK con Lambda mediante .NET.

using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KafkaEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace MSKLambda; public class Function { /// <param name="input">The event for the Lambda function handler to process.</param> /// <param name="context">The ILambdaContext that provides methods for logging and describing the Lambda environment.</param> /// <returns></returns> public void FunctionHandler(KafkaEvent evnt, ILambdaContext context) { foreach (var record in evnt.Records) { Console.WriteLine("Key:" + record.Key); foreach (var eventRecord in record.Value) { var valueBytes = eventRecord.Value.ToArray(); var valueText = Encoding.UTF8.GetString(valueBytes); Console.WriteLine("Message:" + valueText); } } } }
Java
SDK para Java 2.x
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Amazon MSK con Lambda mediante Java.

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK para JavaScript (v3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Uso de un evento de Amazon MSK con Lambda mediante JavaScript.

exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }
Python
SDK para Python (Boto3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de Amazon MSK con Lambda mediante Python.

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)
Ruby
SDK para Ruby
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de Amazon MSK con Lambda mediante Ruby.

require 'base64' def lambda_handler(event:, context:) # Iterate through keys event['records'].each do |key, records| puts "Key: #{key}" # Iterate through records records.each do |record| puts "Record: #{record}" # Decode base64 msg = Base64.decode64(record['value']) puts "Message: #{msg}" end end end

Puede proporcionar un código de función de Lambda con la consola.

Cómo actualizar el código de la función de Lambda
  1. Navegue a la página de información general de la función de Lambda.

  2. Elija la pestaña Código.

  3. Introduzca el código proporcionado en el IDE del código fuente.

  4. En la barra de navegación del código fuente, seleccione Implementar.

Prueba de la función de Lambda para comprobar que está conectada al tema de Amazon MSK

Ahora puede comprobar si el origen de eventos está invocando o no su función de Lambda mediante la inspección de los registros de eventos de CloudWatch.

Cómo comprobar si se está invocando la función de Lambda
  1. Utilice su host de administración de Kafka para generar eventos de Kafka mediante la CLI kafka-console-producer. Para obtener más información, consulte Write some events into the topic en la documentación de Kafka. Envíe suficientes eventos a fin de llenar el lote definido en el tamaño del lote para la asignación de orígenes de eventos especificada en el paso anterior. Si no lo hace, Lambda esperará a que se invoque más información.

  2. Si la función se ejecuta, Lambda escribe lo ocurrido en CloudWatch. En la consola, navegue hasta la página de detalles de su función de Lambda.

  3. Seleccione la pestaña Configuración.

  4. En la barra lateral, seleccione Herramientas de monitoreo y operaciones.

  5. Identifique el grupo de registro de CloudWatch en Configuración de registro. El grupo de registro debe empezar con /aws/lambda. Elija el enlace del grupo de registro.

  6. En la consola de CloudWatch, inspeccione los eventos de registro para ver los eventos de registro que Lambda ha enviado al flujo de registro. Identifique si hay eventos de registro que contengan el mensaje de su evento de Kafka, como se muestra en la siguiente imagen. Si los hay, significa que ha conectado correctamente una función de Lambda a Amazon MSK con una asignación de orígenes de eventos de Lambda.

    Un evento de registro en CloudWatch que muestra la información del evento extraída por el código proporcionado.