Tutorial: Uso de AWS Lambda con Amazon DynamoDB Streams - AWS Lambda

Tutorial: Uso de AWS Lambda con Amazon DynamoDB Streams

En este tutorial, se crea una función de Lambda para consumir eventos de Amazon DynamoDB Stream.

Requisitos previos

En este tutorial, se presupone que tiene algunos conocimientos sobre las operaciones básicas de Lambda y la consola de Lambda. Si aún no lo ha hecho, siga las instrucciones de Cree una función de Lambda con la consola. para crear su primera función de Lambda.

Para completar los siguientes pasos, necesita la versión 2 de la AWS Command Line Interface (AWS CLI). Los comandos y la salida esperada se enumeran en bloques separados:

aws --version

Debería ver los siguientes datos de salida:

aws-cli/2.13.27 Python/3.11.6 Linux/4.14.328-248.540.amzn2.x86_64 exe/x86_64.amzn.2

Para comandos largos, se utiliza un carácter de escape (\) para dividir un comando en varias líneas.

En Linux y macOS, use su administrador de intérprete de comandos y paquetes preferido.

nota

En Windows, algunos comandos de la CLI de Bash que se utilizan habitualmente con Lambda (por ejemplo, zip) no son compatibles con los terminales integrados del sistema operativo. Para obtener una versión de Ubuntu y Bash integrada con Windows, instale el subsistema de Windows para Linux. Los comandos de la CLI de ejemplo de esta guía utilizan el formato Linux. Los comandos que incluyen documentos JSON en línea deben reformatearse si utiliza la CLI de Windows.

Creación del rol de ejecución

Cree el rol de ejecución que concederá a su función permiso para obtener acceso a los recursos de AWS.

Para crear un rol de ejecución
  1. Abra la página Roles en la consola de IAM.

  2. Elija Crear rol.

  3. Cree un rol con las propiedades siguientes.

    • Entidad de confianza: Lambda.

    • Permisos: AWSLambdaDynamoDBExecutionRole.

    • Nombre de rol: lambda-dynamodb-role.

El AWSLambdaDynamoDBExecutionRole tiene los permisos que la función necesita para leer elementos desde DynamoDB y escribir registros al CloudWatch Logs.

Creación de la función

Cree una función de Lambda que procese los eventos de DynamoDB. El código de la función escribe algunos de los datos del evento entrante en CloudWatch Logs.

.NET
AWS SDK for .NET
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante .NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text.Json; using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.DynamoDBEvents; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace AWSLambda_DDB; public class Function { public void FunctionHandler(DynamoDBEvent dynamoEvent, ILambdaContext context) { context.Logger.LogInformation($"Beginning to process {dynamoEvent.Records.Count} records..."); foreach (var record in dynamoEvent.Records) { context.Logger.LogInformation($"Event ID: {record.EventID}"); context.Logger.LogInformation($"Event Name: {record.EventName}"); context.Logger.LogInformation(JsonSerializer.Serialize(record)); } context.Logger.LogInformation("Stream processing complete."); } }
Go
SDK para Go V2
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante Go.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 package main import ( "context" "github.com/aws/aws-lambda-go/lambda" "github.com/aws/aws-lambda-go/events" "fmt" ) func HandleRequest(ctx context.Context, event events.DynamoDBEvent) (*string, error) { if len(event.Records) == 0 { return nil, fmt.Errorf("received empty event") } for _, record := range event.Records { LogDynamoDBRecord(record) } message := fmt.Sprintf("Records processed: %d", len(event.Records)) return &message, nil } func main() { lambda.Start(HandleRequest) } func LogDynamoDBRecord(record events.DynamoDBEventRecord){ fmt.Println(record.EventID) fmt.Println(record.EventName) fmt.Printf("%+v\n", record.Change) }
Java
SDK para Java 2.x
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante Java.

import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent; import com.amazonaws.services.lambda.runtime.events.DynamodbEvent.DynamodbStreamRecord; import com.google.gson.Gson; import com.google.gson.GsonBuilder; public class example implements RequestHandler<DynamodbEvent, Void> { private static final Gson GSON = new GsonBuilder().setPrettyPrinting().create(); @Override public Void handleRequest(DynamodbEvent event, Context context) { System.out.println(GSON.toJson(event)); event.getRecords().forEach(this::logDynamoDBRecord); return null; } private void logDynamoDBRecord(DynamodbStreamRecord record) { System.out.println(record.getEventID()); System.out.println(record.getEventName()); System.out.println("DynamoDB Record: " + GSON.toJson(record.getDynamodb())); } }
JavaScript
SDK para JavaScript (v3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); }; const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };

Consumo de un evento de DynamoDB con Lambda mediante TypeScript.

export const handler = async (event, context) => { console.log(JSON.stringify(event, null, 2)); event.Records.forEach(record => { logDynamoDBRecord(record); }); } const logDynamoDBRecord = (record) => { console.log(record.eventID); console.log(record.eventName); console.log(`DynamoDB Record: ${JSON.stringify(record.dynamodb)}`); };
PHP
SDK para PHP
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante PHP.

<?php # using bref/bref and bref/logger for simplicity use Bref\Context\Context; use Bref\Event\DynamoDb\DynamoDbEvent; use Bref\Event\DynamoDb\DynamoDbHandler; use Bref\Logger\StderrLogger; require __DIR__ . '/vendor/autoload.php'; class Handler extends DynamoDbHandler { private StderrLogger $logger; public function __construct(StderrLogger $logger) { $this->logger = $logger; } /** * @throws JsonException * @throws \Bref\Event\InvalidLambdaEvent */ public function handleDynamoDb(DynamoDbEvent $event, Context $context): void { $this->logger->info("Processing DynamoDb table items"); $records = $event->getRecords(); foreach ($records as $record) { $eventName = $record->getEventName(); $keys = $record->getKeys(); $old = $record->getOldImage(); $new = $record->getNewImage(); $this->logger->info("Event Name:".$eventName."\n"); $this->logger->info("Keys:". json_encode($keys)."\n"); $this->logger->info("Old Image:". json_encode($old)."\n"); $this->logger->info("New Image:". json_encode($new)); // TODO: Do interesting work based on the new data // Any exception thrown will be logged and the invocation will be marked as failed } $totalRecords = count($records); $this->logger->info("Successfully processed $totalRecords items"); } } $logger = new StderrLogger(); return new Handler($logger);
Python
SDK para Python (Boto3)
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante Python.

import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")
Ruby
SDK para Ruby
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante Ruby.

def lambda_handler(event:, context:) return 'received empty event' if event['Records'].empty? event['Records'].each do |record| log_dynamodb_record(record) end "Records processed: #{event['Records'].length}" end def log_dynamodb_record(record) puts record['eventID'] puts record['eventName'] puts "DynamoDB Record: #{JSON.generate(record['dynamodb'])}" end
Rust
SDK para Rust
nota

Hay más información en GitHub. Busque el ejemplo completo y aprenda a configurar y ejecutar en el repositorio de ejemplos sin servidor.

Consumo de un evento de DynamoDB con Lambda mediante Rust.

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::dynamodb::{Event, EventRecord}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<Event>) ->Result<(), Error> { let records = &event.payload.records; tracing::info!("event payload: {:?}",records); if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_dynamo_dbrecord(record); } tracing::info!("Dynamo db records processed"); // Prepare the response Ok(()) } fn log_dynamo_dbrecord(record: &EventRecord)-> Result<(), Error>{ tracing::info!("EventId: {}", record.event_id); tracing::info!("EventName: {}", record.event_name); tracing::info!("DynamoDB Record: {:?}", record.change ); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
Cómo crear la función
  1. Copie el código de muestra en un archivo con el nombre example.js.

  2. Cree un paquete de implementación.

    zip function.zip example.js
  3. Cree una función de Lambda con el comando create-function.

    aws lambda create-function --function-name ProcessDynamoDBRecords \ --zip-file fileb://function.zip --handler example.handler --runtime nodejs18.x \ --role arn:aws:iam::111122223333:role/lambda-dynamodb-role

Probar la función de Lambda

En este paso, invocará la función de Lambda manualmente mediante el comando invoke de la CLI de AWS Lambda y el siguiente evento de muestra de DynamoDB. Copie lo siguiente en un archivo denominado input.txt.

ejemplo input.txt
{ "Records":[ { "eventID":"1", "eventName":"INSERT", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"111", "SizeBytes":26, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"2", "eventName":"MODIFY", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "NewImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"New item!" }, "Id":{ "N":"101" } }, "SequenceNumber":"222", "SizeBytes":59, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" }, { "eventID":"3", "eventName":"REMOVE", "eventVersion":"1.0", "eventSource":"aws:dynamodb", "awsRegion":"us-east-1", "dynamodb":{ "Keys":{ "Id":{ "N":"101" } }, "OldImage":{ "Message":{ "S":"This item has changed" }, "Id":{ "N":"101" } }, "SequenceNumber":"333", "SizeBytes":38, "StreamViewType":"NEW_AND_OLD_IMAGES" }, "eventSourceARN":"stream-ARN" } ] }

Ejecute el siguiente comando de la invoke.

aws lambda invoke --function-name ProcessDynamoDBRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt

La opción cli-binary-format es obligatoria si va a utilizar la versión 2 de la AWS CLI. Para que esta sea la configuración predeterminada, ejecute aws configure set cli-binary-format raw-in-base64-out. Para obtener más información, consulte Opciones de la línea de comandos globales compatibles con AWS CLI en la Guía del usuario de la AWS Command Line Interface versión 2.

La función devuelve la cadena message en el cuerpo de la respuesta.

Verifique la salida en el archivo outputfile.txt.

Crear una tabla de DynamoDB con un flujo habilitado

Crear una tabla de Amazon DynamoDB con un flujo habilitado.

Para crear una tabla de DynamoDB
  1. Abra la consola de DynamoDB.

  2. Elija Crear tabla.

  3. Cree una tabla con la siguiente configuración:

    • Table name (Nombre de la tabla)lambda-dynamodb-stream

    • Clave principal: id (cadena)

  4. Seleccione Crear.

Habilitar secuencias
  1. Abra la consola de DynamoDB.

  2. Elija Tables (Tablas).

  3. Elija la tabla lambda-dynamodb-stream (Flujo de dynamodb lambda).

  4. En Exports and streams (Exportaciones y flujos), elija DynamoDB stream details (Detalles del flujo de DynamoDB).

  5. Elija Turn on.

  6. En Tipo de vista, elija Solo atributos clave.

  7. Seleccione Activar flujo.

Anote el ARN del flujo. Lo necesitará en el siguiente paso al asociar el flujo a la función de Lambda. Para obtener más información acerca de cómo habilitar flujos, consulte Captura de la actividad de las tablas con DynamoDB Streams.

Añadir un origen de eventos en AWS Lambda

Crear un mapeo de origen de eventos en AWS Lambda. Esta asignación de orígenes de eventos asocia el flujo de DynamoDB a la función de Lambda. Una vez creado este mapeo de origen de eventos, AWS Lambda comienza a sondear el flujo.

Ejecute el siguiente comando create-event-source-mapping de la AWS CLI. Después de ejecutar el comando, anote el UUID. Necesitará este UUID para hacer referencia al mapeo de origen de eventos en los comandos, por ejemplo, al eliminar el mapeo de origen de eventos.

aws lambda create-event-source-mapping --function-name ProcessDynamoDBRecords \ --batch-size 100 --starting-position LATEST --event-source DynamoDB-stream-arn

Esto crea un mapeo entre el flujo de DynamoDB especificado y la función de Lambda. Puede asociar un flujo de DynamoDB a varias funciones de Lambda, así como asociar la misma función de Lambda a varios flujos. Sin embargo, las funciones de Lambda compartirán el rendimiento de lectura para el flujo que comparten.

Para obtener la lista de mapeos de orígenes de eventos, ejecute el siguiente comando.

aws lambda list-event-source-mappings

La lista devuelve todos los mapeos de orígenes de eventos creados, y para cada uno de ellos muestra el LastProcessingResult, entre otras cosas. Este campo se utiliza para proporcionar un mensaje informativo en caso de que surja algún problema. Los valores como No records processed (indica que AWS Lambda no ha comenzado el sondeo o que no hay registros en el flujo) y OK (indica que AWS Lambda ha leído correctamente los registros del flujo y ha invocado la función de Lambda) indican que no hay ningún problema. Si hay algún problema, recibirá un mensaje de error.

Si tiene muchos mapeos de origen de eventos, use la función nombrar parámetro para reducir los resultados.

aws lambda list-event-source-mappings --function-name ProcessDynamoDBRecords

Prueba de la configuración

Probar la experiencia integral. A medida que se actualiza la tabla, DynamoDB escribe los registros de eventos en el flujo. Cuando AWS Lambda sondea el flujo, detecta nuevos registros en él e invoca la función de Lambda en nombre del usuario pasando eventos a esta.

  1. En la consola de DynamoDB, agregue, actualice y elimine elementos de la tabla. DynamoDB escribe registros de estas acciones en el flujo.

  2. AWS Lambda sondea el flujo y, cuando detecta actualizaciones en este, invoca la función de Lambda pasando los datos de eventos que encuentra en el flujo.

  3. La función se ejecuta y crea registros en Amazon CloudWatch. Puede verificar los registros reportados en la consola de Amazon CloudWatch.

Eliminación de sus recursos

A menos que desee conservar los recursos que creó para este tutorial, puede eliminarlos ahora. Si elimina los recursos de AWS que ya no utiliza, evitará gastos innecesarios en su Cuenta de AWS.

Cómo eliminar la función de Lambda
  1. Abra la página de Funciones en la consola de Lambda.

  2. Seleccione la función que ha creado.

  3. Elija Acciones, Eliminar.

  4. Escriba delete en el campo de entrada de texto y elija Delete(Eliminar).

Cómo eliminar el rol de ejecución
  1. Abra la página Roles en la consola de IAM.

  2. Seleccione el rol de ejecución que creó.

  3. Elija Eliminar.

  4. Si desea continuar, escriba el nombre del rol en el campo de entrada de texto y elija Delete (Eliminar).

Para eliminar la tabla de DynamoDB
  1. Abra la página Tables (Tablas) en la consola de DynamoDB.

  2. Seleccione la tabla que ha creado.

  3. Elija Eliminar.

  4. Escriba delete en el cuadro de texto.

  5. Elija Delete table (Eliminar tabla).