教程:将 AWS Lambda 与 Amazon DocumentDB 流结合使用 - AWS Lambda

教程:将 AWS Lambda 与 Amazon DocumentDB 流结合使用

在本教程中,您将创建一个基本的 Lambda 函数,该函数会消耗来自 Amazon DocumentDB(与 MongoDB 兼容)更改流的事件。要完成本教程,您需要经历以下阶段:

  • 设置 Amazon DocumentDB 集群,连接到该集群,并在其上激活更改流。

  • 创建 Lambda 函数,并将 Amazon DocumentDB 集群配置为函数的事件源。

  • 将项目插入到 Amazon DocumentDB 数据库中,以测试端到端设置。

先决条件

如果您还没有 AWS 账户,请完成以下步骤来创建一个。

注册 AWS 账户
  1. 打开 https://portal.aws.amazon.com/billing/signup

  2. 按照屏幕上的说明进行操作。

    在注册时,将接到一通电话,要求使用电话键盘输入一个验证码。

    当您注册 AWS 账户时,系统将会创建一个 AWS 账户根用户。根用户有权访问该账户中的所有 AWS 服务 和资源。作为安全最佳实践,请为用户分配管理访问权限,并且只使用根用户来执行需要根用户访问权限的任务

注册过程完成后,AWS 会向您发送一封确认电子邮件。在任何时候,您都可以通过转至 https://aws.amazon.com/ 并选择我的账户来查看当前的账户活动并管理您的账户。

注册 AWS 账户 后,请保护好您的 AWS 账户根用户,启用 AWS IAM Identity Center,并创建一个管理用户,以避免使用根用户执行日常任务。

保护您的 AWS 账户根用户
  1. 选择根用户并输入您的 AWS 账户电子邮件地址,以账户拥有者身份登录 AWS Management Console。在下一页上,输入您的密码。

    要获取使用根用户登录方面的帮助,请参阅《AWS 登录 用户指南》中的以根用户身份登录

  2. 为您的根用户启用多重身份验证 (MFA)。

    有关说明,请参阅《IAM 用户指南》中的为 AWS 账户 根用户启用虚拟 MFA 设备(控制台)

创建具有管理访问权限的用户
  1. 启用 IAM Identity Center。

    有关说明,请参阅《AWS IAM Identity Center 用户指南》中的启用 AWS IAM Identity Center

  2. 在 IAM Identity Center 中,为用户授予管理访问权限。

    有关如何使用 IAM Identity Center 目录 作为身份源的教程,请参阅《AWS IAM Identity Center 用户指南》中的使用默认的 IAM Identity Center 目录 配置用户访问权限

以具有管理访问权限的用户身份登录
  • 要使用您的 IAM Identity Center 用户身份登录,请使用您在创建 IAM Identity Center 用户时发送到您的电子邮件地址的登录网址。

    要获取使用 IAM Identity Center 用户登录方面的帮助,请参阅《AWS 登录 用户指南》中的登录 AWS 访问门户

将访问权限分配给其他用户
  1. 在 IAM Identity Center 中,创建一个权限集,该权限集遵循应用最低权限的最佳做法。

    有关说明,请参阅《AWS IAM Identity Center 用户指南》中的创建权限集

  2. 将用户分配到一个组,然后为该组分配单点登录访问权限。

    有关说明,请参阅《AWS IAM Identity Center 用户指南》中的添加组

如果您尚未安装 AWS Command Line Interface,请按照安装或更新最新版本的 AWS CLI 中的步骤进行安装。

本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中,可使用您首选的 Shell 和程序包管理器。

注意

在 Windows 中,操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令(例如 zip)。安装 Windows Subsystem for Linux,获取 Ubuntu 和 Bash 与 Windows 集成的版本。

创建 AWS Cloud9 环境

步骤 1:创建 AWS Cloud9 环境

在创建 Lambda 函数之前,您需要创建和配置 Amazon DocumentDB 集群。本教程中的集群设置步骤基于 Amazon DocumentDB 入门中的过程。

注意

如果已设置 Amazon DocumentDB 集群,请确保激活更改流并创建必要的接口 VPC 端点。然后,您可以直接跳到函数创建步骤。

首先,创建一个 AWS Cloud9 环境。在本教程中,您将使用此环境来连接和查询 Amazon DocumentDB 集群。

创建 AWS Cloud9 环境
  1. 打开 AWS Cloud9 控制台并选择创建环境

  2. 使用以下配置创建环境:

    • 详细信息下:

      • 名称DocumentDBCloud9Environment

      • 环境类型:新 EC2 实例

    • 新 EC2 实例下:

      • 实例类型t2.micro(1 GiB RAM + 1 个 vCPU)

      • 平台:Amazon Linux 2

      • 超时:30 分钟

    • 网络设置下:

      • 连接:AWS Systems Manager(SSM)

      • 展开 VPC 设置下拉列表。

      • Amazon Virtual Private Cloud(VPC):选择默认 VPC

      • 子网:无首选项

    • 保留所有其他默认设置。

  3. 选择创建。预配置新的 AWS Cloud9 环境可能需要花费几分钟的时间。

创建 Amazon EC2 安全组

步骤 2:创建 Amazon EC2 安全组

接下来,使用允许 Amazon DocumentDB 集群和 AWS Cloud9 环境之间传输流量的规则创建 Amazon EC2 安全组

创建 EC2 安全组
  1. 打开 EC2 控制台。在网络与安全性下,选择安全组

  2. 选择创建安全组

  3. 使用以下配置创建安全组:

    • 基本详细信息下:

      • 安全组名称:DocDBTutorial

      • 描述:用于在 AWS Cloud9 与 Amazon DocumentDB 之间传输流量的安全组。

      • VPC:选择默认 VPC

    • Inbound rules (入站规则)下面,选择 Add rule (添加规则)。使用以下配置创建规则:

      • 类型:自定义 TCP

      • 端口范围:27017

      • Source:Custom

      • 旁边的搜索框中,为您在上一步中创建的 AWS Cloud9 环境选择安全组。要查看可用安全组列表,请在搜索框中输入 cloud9。选择名为 aws-cloud9-<environment_name> 的安全组。

    • 保留所有其他默认设置。

  4. 选择创建安全组

创建 Amazon DocumentDB 集群

步骤 3:创建 Amazon DocumentDB 集群

在此步骤中,您将使用上一步中的安全组创建 Amazon DocumentDB 集群。

创建 Amazon DocumentDB 集群
  1. 打开 Amazon DocumentDB 控制台。在集群下,选择创建

  2. 使用以下配置创建集群:

    • 对于集群类型,选择“基于实例的集群”。

    • 配置下:

      • 引擎版本:5.0.0

      • 实例类:db.t3.medium(可免费试用)

      • 实例数:1。

    • 身份验证下:

      • 输入连接到集群所需的用户名密码(与上一步中用于创建密钥的凭证相同)。在确认密码中,确认您的密码。

    • 开启显示高级设置

    • 网络设置下:

      • Virtual Private Cloud(VPC):选择默认 VPC

      • 子网组:默认

      • VPC 安全组:除 default (VPC) 之外,选择您在上一步中创建的 DocDBTutorial (VPC) 安全组。

    • 保留所有其他默认设置。

  3. 选择创建集群。预置 Amazon DocumentDB 集群可能需要花费几分钟的时间。

在 Secrets Manager 中创建密钥

步骤 4 在 Secrets Manager 中创建密钥

要手动访问 Amazon DocumentDB 集群,必须提供用户名和密码凭证。要允许 Lambda 访问集群,您必须在设置事件源映射时,提供包含这些相同访问凭证的 Secrets Manager 密钥。在此步骤中,您将创建此密钥。

在 Secrets Manager 中创建密钥
  1. 打开 Secrets Manager 控制台并选择存储新密钥

  2. 对于选择密钥类型,请选择以下选项之一:

    • 基本详细信息下:

      • 密钥类型:用于 Amazon DocumentDB 数据库的凭证

      • 凭证下,输入用于访问 Amazon DocumentDB 集群的用户名和密码。

      • 数据库:选择 Amazon DocumentDB 集群。

      • 选择下一步

  3. 对于配置密钥,请选择以下选项之一:

    • 密钥名称DocumentDBSecret

    • 选择下一步

  4. 选择下一步

  5. 选择 Store (存储)。

  6. 刷新控制台以验证 DocumentDBSecret 密钥是否成功存储。

记下密钥的密钥 ARN。您将在后面的步骤中用到它。

安装 mongo shell

步骤 5 安装 Mongo Shell

在此步骤中,您将在 AWS Cloud9 环境中安装 Mongo Shell。Mongo Shell 是一个命令行实用程序,用于连接和查询 Amazon DocumentDB 集群。

在 AWS Cloud9 环境中安装 Mongo Shell
  1. 打开AWS Cloud9控制台。在之前创建的 DocumentDBCloud9Environment 环境旁边,单击 AWS Cloud9 IDE 列下的打开链接。

  2. 在终端窗口中,使用以下命令创建 MongoDB 存储库文件:

    echo -e "[mongodb-org-5.0] \nname=MongoDB Repository\nbaseurl=https://repo.mongodb.org/yum/amazon/2/mongodb-org/5.0/x86_64/\ngpgcheck=1 \nenabled=1 \ngpgkey=https://www.mongodb.org/static/pgp/server-5.0.asc" | sudo tee /etc/yum.repos.d/mongodb-org-5.0.repo
  3. 然后,使用以下命令安装 Mongo Shell:

    sudo yum install -y mongodb-org-shell
  4. 要加密传输中数据,请下载 Amazon DocumentDB 的公有密钥。以下命令将下载名为 global-bundle.pem 的文件:

    wget https://truststore.pki.rds.amazonaws.com/global/global-bundle.pem

连接到 Amazon DocumentDB 集群

步骤 6:连接到 Amazon DocumentDB 集群

现在,您已经准备好使用 Mongo Shell 连接到 Amazon DocumentDB 集群。

连接到 Amazon DocumentDB 集群
  1. 打开 Amazon DocumentDB 控制台。在集群下,通过选择集群标识符来选择集群。

  2. 连接和安全性选项卡中,在使用 Mongo Shell 连接到此集群下,选择复制

  3. 在 AWS Cloud9 环境中,将此命令粘贴到终端。将 <insertYourPassword> 替换为正确的密码。

输入此命令后,如果命令提示符变为 rs0:PRIMARY>,则表示您已连接到 Amazon DocumentDB 集群。

激活更改流

步骤 7 激活更改流

在本教程中,您将跟踪对 Amazon DocumentDB 集群中 docdbdemo 数据库 products 集合的更改。您可以通过激活更改流来完成此操作。首先,创建 docdbdemo 数据库并通过插入记录对其进行测试。

在集群内创建新数据库
  1. 在 AWS Cloud9 环境中,确保仍然连接到 Amazon DocumentDB 集群

  2. 在终端窗口中,使用以下命令创建一个名为 docdbdemo 的新数据库:

    use docdbdemo
  3. 然后,使用以下命令将记录插入到 docdbdemo 中:

    db.products.insert({"hello":"world"})

    应看到类似如下内容的输出:

    WriteResult({ "nInserted" : 1 })
  4. 使用以下命令列出所有数据库:

    show dbs

    确保输出包含 docdbdemo 数据库:

    docdbdemo 0.000GB

接下来,使用以下命令激活 docdbdemo 数据库 products 集合上的更改流:

db.adminCommand({modifyChangeStreams: 1, database: "docdbdemo", collection: "products", enable: true});

应看到类似如下内容的输出:

{ "ok" : 1, "operationTime" : Timestamp(1680126165, 1) }

创建接口 VPC 端点

步骤 8 创建接口 VPC 端点

接下来,创建接口 VPC 端点,以确保 Lambda 和 Secrets Manager(稍后用于存储集群访问凭证)能够连接到默认 VPC。

创建接口 VPC 端点
  1. 打开 VPC 控制台。在左侧菜单的虚拟私有云下,选择端点

  2. 选择创建端点。使用以下配置创建端点:

    • 对于名称标签,输入 lambda-default-vpc

    • 对于服务类别,选择 AWS 服务。

    • 对于服务,在搜索框中输入 lambda。选择格式为 com.amazonaws.<region>.lambda 的服务。

    • 对于 VPC,选择默认 VPC

    • 对于子网,选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。

    • 对于 IP 地址类型,选择 IPv4。

    • 对于安全组,选择默认的 VPC 安全组(组名称为 default),以及之前创建的安全组(组名称为 DocDBTutorial)。

    • 保留所有其他默认设置。

    • 选择创建端点

  3. 再次选择创建端点。使用以下配置创建端点:

    • 对于名称标签,输入 secretsmanager-default-vpc

    • 对于服务类别,选择 AWS 服务。

    • 对于服务,在搜索框中输入 secretsmanager。选择格式为 com.amazonaws.<region>.secretsmanager 的服务。

    • 对于 VPC,选择默认 VPC

    • 对于子网,选中每个可用区旁边的复选框。为每个可用区选择正确的子网 ID。

    • 对于 IP 地址类型,选择 IPv4。

    • 对于安全组,选择默认的 VPC 安全组(组名称为 default),以及之前创建的安全组(组名称为 DocDBTutorial)。

    • 保留所有其他默认设置。

    • 选择创建端点

本教程的集群设置部分到此完成。

创建执行角色

步骤 9 创建执行角色

在接下来的一组步骤中,您将创建 Lambda 函数。首先,您需要创建执行角色,以向函数授予访问集群的权限。为此,您可以先创建 IAM policy,然后将此策略附加到 IAM 角色。

创建 IAM policy
  1. 在 IAM 控制台中打开策略页面,然后选择创建策略

  2. 选择 JSON 选项卡。在以下策略中,将语句最后一行中的 Secrets Manager 资源 ARN 替换为之前的密钥 ARN,然后将策略复制到编辑器中。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "LambdaESMNetworkingAccess", "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeVpcs", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeSecurityGroups", "kms:Decrypt" ], "Resource": "*" }, { "Sid": "LambdaDocDBESMAccess", "Effect": "Allow", "Action": [ "rds:DescribeDBClusters", "rds:DescribeDBClusterParameters", "rds:DescribeDBSubnetGroups" ], "Resource": "*" }, { "Sid": "LambdaDocDBESMGetSecretValueAccess", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": "arn:aws:secretsmanager:us-east-1:123456789012:secret:DocumentDBSecret" } ] }
  3. 选择下一步:标签,然后选择下一步:审核

  4. 对于 Name (名称),请输入 AWSDocumentDBLambdaPolicy

  5. 选择创建策略

创建 IAM 角色
  1. 在 IAM 控制台中打开角色页面,然后选择创建角色

  2. 对于选择可信实体,请选择以下选项之一:

    • 可信实体类型 – AWS 服务

    • 应用场景 – Lambda

    • 选择下一步

  3. 对于添加权限,选择刚刚创建的 AWSDocumentDBLambdaPolicy 策略以及 AWSLambdaBasicExecutionRole,以向函数授予写入 Amazon CloudWatch Logs 的权限。

  4. 选择下一步

  5. 对于 Role name(角色名称),输入 AWSDocumentDBLambdaExecutionRole

  6. 请选择 Create role(创建角色)。

创建 Lambda 函数

步骤 10 创建 Lambda 函数

以下示例代码接收 Amazon DocumentDB 事件输入并对其所包含的消息进行处理。

.NET
AWS SDK for .NET
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 .NET 将 Amazon DocumentDB 事件与 Lambda 结合使用。

using Amazon.Lambda.Core; using System.Text.Json; using System; using System.Collections.Generic; using System.Text.Json.Serialization; //Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace LambdaDocDb; public class Function { /// <summary> /// Lambda function entry point to process Amazon DocumentDB events. /// </summary> /// <param name="event">The Amazon DocumentDB event.</param> /// <param name="context">The Lambda context object.</param> /// <returns>A string to indicate successful processing.</returns> public string FunctionHandler(Event evnt, ILambdaContext context) { foreach (var record in evnt.Events) { ProcessDocumentDBEvent(record, context); } return "OK"; } private void ProcessDocumentDBEvent(DocumentDBEventRecord record, ILambdaContext context) { var eventData = record.Event; var operationType = eventData.OperationType; var databaseName = eventData.Ns.Db; var collectionName = eventData.Ns.Coll; var fullDocument = JsonSerializer.Serialize(eventData.FullDocument, new JsonSerializerOptions { WriteIndented = true }); context.Logger.LogLine($"Operation type: {operationType}"); context.Logger.LogLine($"Database: {databaseName}"); context.Logger.LogLine($"Collection: {collectionName}"); context.Logger.LogLine($"Full document:\n{fullDocument}"); } public class Event { [JsonPropertyName("eventSourceArn")] public string EventSourceArn { get; set; } [JsonPropertyName("events")] public List<DocumentDBEventRecord> Events { get; set; } [JsonPropertyName("eventSource")] public string EventSource { get; set; } } public class DocumentDBEventRecord { [JsonPropertyName("event")] public EventData Event { get; set; } } public class EventData { [JsonPropertyName("_id")] public IdData Id { get; set; } [JsonPropertyName("clusterTime")] public ClusterTime ClusterTime { get; set; } [JsonPropertyName("documentKey")] public DocumentKey DocumentKey { get; set; } [JsonPropertyName("fullDocument")] public Dictionary<string, object> FullDocument { get; set; } [JsonPropertyName("ns")] public Namespace Ns { get; set; } [JsonPropertyName("operationType")] public string OperationType { get; set; } } public class IdData { [JsonPropertyName("_data")] public string Data { get; set; } } public class ClusterTime { [JsonPropertyName("$timestamp")] public Timestamp Timestamp { get; set; } } public class Timestamp { [JsonPropertyName("t")] public long T { get; set; } [JsonPropertyName("i")] public int I { get; set; } } public class DocumentKey { [JsonPropertyName("_id")] public Id Id { get; set; } } public class Id { [JsonPropertyName("$oid")] public string Oid { get; set; } } public class Namespace { [JsonPropertyName("db")] public string Db { get; set; } [JsonPropertyName("coll")] public string Coll { get; set; } } }
Go
适用于 Go V2 的 SDK
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Go 将 Amazon DocumentDB 事件与 Lambda 结合使用。

package main import ( "context" "encoding/json" "fmt" "github.com/aws/aws-lambda-go/lambda" ) type Event struct { Events []Record `json:"events"` } type Record struct { Event struct { OperationType string `json:"operationType"` NS struct { DB string `json:"db"` Coll string `json:"coll"` } `json:"ns"` FullDocument interface{} `json:"fullDocument"` } `json:"event"` } func main() { lambda.Start(handler) } func handler(ctx context.Context, event Event) (string, error) { fmt.Println("Loading function") for _, record := range event.Events { logDocumentDBEvent(record) } return "OK", nil } func logDocumentDBEvent(record Record) { fmt.Printf("Operation type: %s\n", record.Event.OperationType) fmt.Printf("db: %s\n", record.Event.NS.DB) fmt.Printf("collection: %s\n", record.Event.NS.Coll) docBytes, _ := json.MarshalIndent(record.Event.FullDocument, "", " ") fmt.Printf("Full document: %s\n", string(docBytes)) }
JavaScript
适用于 JavaScript 的 SDK(v3)
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 JavaScript 将 Amazon DocumentDB 事件与 Lambda 结合使用。

console.log('Loading function'); exports.handler = async (event, context) => { event.events.forEach(record => { logDocumentDBEvent(record); }); return 'OK'; }; const logDocumentDBEvent = (record) => { console.log('Operation type: ' + record.event.operationType); console.log('db: ' + record.event.ns.db); console.log('collection: ' + record.event.ns.coll); console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2)); };

使用 TypeScript 将 Amazon DocumentDB 事件与 Lambda 结合使用

import { DocumentDBEventRecord, DocumentDBEventSubscriptionContext } from 'aws-lambda'; console.log('Loading function'); export const handler = async ( event: DocumentDBEventSubscriptionContext, context: any ): Promise<string> => { event.events.forEach((record: DocumentDBEventRecord) => { logDocumentDBEvent(record); }); return 'OK'; }; const logDocumentDBEvent = (record: DocumentDBEventRecord): void => { console.log('Operation type: ' + record.event.operationType); console.log('db: ' + record.event.ns.db); console.log('collection: ' + record.event.ns.coll); console.log('Full document:', JSON.stringify(record.event.fullDocument, null, 2)); };
PHP
适用于 PHP 的 SDK
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 PHP 将 Amazon DocumentDB 事件与 Lambda 结合使用。

<?php require __DIR__.'/vendor/autoload.php'; use Bref\Context\Context; use Bref\Event\Handler; class DocumentDBEventHandler implements Handler { public function handle($event, Context $context): string { $events = $event['events'] ?? []; foreach ($events as $record) { $this->logDocumentDBEvent($record['event']); } return 'OK'; } private function logDocumentDBEvent($event): void { // Extract information from the event record $operationType = $event['operationType'] ?? 'Unknown'; $db = $event['ns']['db'] ?? 'Unknown'; $collection = $event['ns']['coll'] ?? 'Unknown'; $fullDocument = $event['fullDocument'] ?? []; // Log the event details echo "Operation type: $operationType\n"; echo "Database: $db\n"; echo "Collection: $collection\n"; echo "Full document: " . json_encode($fullDocument, JSON_PRETTY_PRINT) . "\n"; } } return new DocumentDBEventHandler();
Python
SDK for Python (Boto3)
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 将 Amazon DocumentDB 事件与 Lambda 结合使用。

import json def lambda_handler(event, context): for record in event.get('events', []): log_document_db_event(record) return 'OK' def log_document_db_event(record): event_data = record.get('event', {}) operation_type = event_data.get('operationType', 'Unknown') db = event_data.get('ns', {}).get('db', 'Unknown') collection = event_data.get('ns', {}).get('coll', 'Unknown') full_document = event_data.get('fullDocument', {}) print(f"Operation type: {operation_type}") print(f"db: {db}") print(f"collection: {collection}") print("Full document:", json.dumps(full_document, indent=2))
Ruby
适用于 Ruby 的 SDK
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Ruby 将 Amazon DocumentDB 事件与 Lambda 结合使用。

require 'json' def lambda_handler(event:, context:) event['events'].each do |record| log_document_db_event(record) end 'OK' end def log_document_db_event(record) event_data = record['event'] || {} operation_type = event_data['operationType'] || 'Unknown' db = event_data.dig('ns', 'db') || 'Unknown' collection = event_data.dig('ns', 'coll') || 'Unknown' full_document = event_data['fullDocument'] || {} puts "Operation type: #{operation_type}" puts "db: #{db}" puts "collection: #{collection}" puts "Full document: #{JSON.pretty_generate(full_document)}" end
Rust
适用于 Rust 的 SDK
注意

查看 GitHub,了解更多信息。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Rust 将 Amazon DocumentDB 事件与 Lambda 结合使用。

use lambda_runtime::{service_fn, tracing, Error, LambdaEvent}; use aws_lambda_events::{ event::documentdb::{DocumentDbEvent, DocumentDbInnerEvent}, }; // Built with the following dependencies: //lambda_runtime = "0.11.1" //serde_json = "1.0" //tokio = { version = "1", features = ["macros"] } //tracing = { version = "0.1", features = ["log"] } //tracing-subscriber = { version = "0.3", default-features = false, features = ["fmt"] } //aws_lambda_events = "0.15.0" async fn function_handler(event: LambdaEvent<DocumentDbEvent>) ->Result<(), Error> { tracing::info!("Event Source ARN: {:?}", event.payload.event_source_arn); tracing::info!("Event Source: {:?}", event.payload.event_source); let records = &event.payload.events; if records.is_empty() { tracing::info!("No records found. Exiting."); return Ok(()); } for record in records{ log_document_db_event(record); } tracing::info!("Document db records processed"); // Prepare the response Ok(()) } fn log_document_db_event(record: &DocumentDbInnerEvent)-> Result<(), Error>{ tracing::info!("Change Event: {:?}", record.event); Ok(()) } #[tokio::main] async fn main() -> Result<(), Error> { tracing_subscriber::fmt() .with_max_level(tracing::Level::INFO) .with_target(false) .without_time() .init(); let func = service_fn(function_handler); lambda_runtime::run(func).await?; Ok(()) }
创建 Lambda 函数
  1. 将示例代码复制到名为 index.js 的文件中。

  2. 使用以下命令创建部署包。

    zip function.zip index.js
  3. 使用以下 CLI 命令创建函数。将 us-east-1 替换为 AWS 区域,并将 123456789012 替换为账户 ID。

    aws lambda create-function \ --function-name ProcessDocumentDBRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs20.x \ --region us-east-1 \ --role arn:aws:iam::123456789012:role/AWSDocumentDBLambdaExecutionRole

创建 Lambda 事件源映射

步骤 11 创建 Lambda 事件源映射

创建事件源映射,将 Amazon DocumentDB 更改流与 Lambda 函数相关联。创建此事件源映射后,AWS Lambda 即开始轮询该流。

创建事件源映射
  1. 在 Lambda 控制台中打开函数页面

  2. 选择您之前创建的 ProcessDocumentDBRecords 函数。

  3. 选择配置选项卡,然后从左侧菜单中选择触发器

  4. 选择添加触发器

  5. 触发器配置下,为源选择 Amazon DocumentDB

  6. 使用以下配置创建事件源映射:

    • Amazon DocumentDB 集群 – 选择之前创建的集群。

    • 数据库名称docdbdemo

    • 集合名称 – 产品

    • 批处理大小 – 1

    • 起始位置 – 最新

    • 身份验证 – BASIC_AUTH

    • Secrets Manager 密钥 – 选择刚刚创建的 DocumentDBSecret

    • 批处理时段 – 1

    • 完整文档配置 – UpdateLookup

  7. 选择添加。创建事件源映射可能需要花费几分钟的时间。

测试函数 – 手动调用

步骤 12 通过手动调用测试函数

要测试您是否正确创建函数和事件源映射,请使用 invoke 命令调用函数。为此,请先将以下事件 JSON 复制到名为 input.txt 的文件中:

{ "eventSourceArn": "arn:aws:rds:us-east-1:123456789012:cluster:canaryclusterb2a659a2-qo5tcmqkcl03", "events": [ { "event": { "_id": { "_data": "0163eeb6e7000000090100000009000041e1" }, "clusterTime": { "$timestamp": { "t": 1676588775, "i": 9 } }, "documentKey": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" } }, "fullDocument": { "_id": { "$oid": "63eeb6e7d418cd98afb1c1d7" }, "anyField": "sampleValue" }, "ns": { "db": "docdbdemo", "coll": "products" }, "operationType": "insert" } } ], "eventSource": "aws:docdb" }

然后,使用以下命令来调用包含此事件的函数:

aws lambda invoke \ --function-name ProcessDocumentDBRecords \ --cli-binary-format raw-in-base64-out \ --region us-east-1 \ --payload file://input.txt out.txt

您应该会看到如下响应:

{ "StatusCode": 200, "ExecutedVersion": "$LATEST" }

您可以通过查看 CloudWatch Logs 来验证函数是否成功处理该事件。

通过 CloudWatch Logs 验证手动调用
  1. 在 Lambda 控制台中打开函数页面

  2. 选择监控选项卡,然后选择查看 CloudWatch 日志。这会将您引导至 CloudWatch 控制台中与函数相关联的特定日志组。

  3. 选择最新的日志流。在日志消息中,您应该会看到事件 JSON。

测试函数 – 插入记录

步骤 13 通过插入记录来测试函数。

通过直接与 Amazon DocumentDB 数据库交互来测试端到端设置。在接下来的一组步骤中,您将插入记录,对其进行更新,然后将其删除。

插入记录
  1. 在 AWS Cloud9 环境中重新连接到 Amazon DocumentDB 集群

  2. 使用此命令以确保您当前正在使用 docdbdemo 数据库:

    use docdbdemo
  3. docdbdemo 数据库的 products 集合中插入记录:

    db.products.insert({"name":"Pencil", "price": 1.00})

测试函数 – 更新记录

步骤 14 通过更新记录来测试函数。

然后,使用以下命令更新您刚刚插入的记录:

db.products.update( { "name": "Pencil" }, { $set: { "price": 0.50 }} )

通过查看 CloudWatch Logs 来验证函数是否成功处理该事件。

测试函数 – 删除记录

步骤 15 通过删除记录来测试函数。

最后,使用以下命令删除您刚刚更新的记录:

db.products.remove( { "name": "Pencil" } )

通过查看 CloudWatch Logs 来验证函数是否成功处理该事件。

清除资源

除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除您不再使用的 AWS 资源,可防止您的 AWS 账户 产生不必要的费用。

删除 Lambda 函数
  1. 打开 Lamba 控制台的 Functions(函数)页面

  2. 选择您创建的函数。

  3. 依次选择操作删除

  4. 在文本输入字段中键入 delete,然后选择删除

删除执行角色
  1. 打开 IAM 控制台的角色页面

  2. 选择您创建的执行角色。

  3. 选择删除

  4. 在文本输入字段中输入角色名称,然后选择删除

删除 VPC 端点
  1. 打开 VPC 控制台。在左侧菜单的虚拟私有云下,选择端点

  2. 选择您创建的端点。

  3. 选择 Actions(操作)、Delete VPC Endpoint(删除 VPC 端点)。

  4. 在文本输入字段中输入 delete

  5. 选择删除

删除 Amazon DocumentDB 集群
  1. 打开 Amazon DocumentDB 控制台

  2. 选择您为本教程创建的 Amazon DocumentDB 集群,并禁用删除保护。

  3. 在主集群页面中,再次选择 Amazon DocumentDB 集群。

  4. 依次选择操作删除

  5. 对于创建最终集群快照,选择

  6. 在文本输入字段中输入 delete

  7. 选择删除

在 Secrets Manager 中删除密钥
  1. 打开 Secrets Manager 控制台

  2. 选择您为本教程创建的密钥。

  3. 依次选择操作删除密钥

  4. 选择计划删除

删除 Amazon EC2 安全组
  1. 打开 EC2 控制台。在网络与安全性下,选择安全组

  2. 选择您为本教程创建的安全组。

  3. 依次选择操作删除安全组

  4. 选择删除

删除 AWS Cloud9 环境
  1. 打开AWS Cloud9控制台

  2. 选择您为本教程创建的环境。

  3. 选择删除

  4. 在文本输入字段中输入 delete

  5. 选择 Delete(删除)。