在本教程中,您将创建 Lambda 函数来处理 Amazon Kinesis 数据流中的事件。
-
自定义应用程序将记录写入流。
-
AWS Lambda 轮询流并在检测到流中的新记录时调用 Lambda 函数。
-
AWS Lambda 通过代入您在创建 Lambda 函数时指定的执行角色来运行 Lambda 函数。
先决条件
如果您尚未安装 AWS Command Line Interface,请按照安装或更新最新版本的 AWS CLI 中的步骤进行安装。
本教程需要命令行终端或 Shell 来运行命令。在 Linux 和 macOS 中,可使用您首选的 Shell 和程序包管理器。
注意
在 Windows 中,操作系统的内置终端不支持您经常与 Lambda 一起使用的某些 Bash CLI 命令(例如 zip
)。安装 Windows Subsystem for Linux
创建执行角色
创建执行角色,向您的函数授予访问 AWS 资源的权限。
创建执行角色
-
在 IAM 控制台中,打开 Roles(角色)页面
。 -
选择创建角色。
-
创建具有以下属性的角色。
-
Trusted entity(可信任的实体)– AWS Lambda。
-
Permissions(权限)– AWSLambdaKinesisExecutionRole。
-
Role name(角色名称)–
lambda-kinesis-role
。
-
AWSLambdaKinesisExecutionRole 策略具有该函数从 Kinesis 中读取项目并将日志写入 CloudWatch Logs 所需的权限。
创建函数
创建一个处理您的 Kinesis 消息的 Lambda 函数。函数代码会将 Kinesis 记录的事件 ID 和事件数据记录到 CloudWatch Logs 中。
本教程使用 Node.js 18.x 运行时系统,但我们还提供了其他运行时系统语言的示例代码。您可以选择以下框中的选项卡,查看适用于您感兴趣的运行时系统的代码。您将在此步骤中使用的 JavaScript 代码是 JavaScript 选项卡中显示的第一个示例。
- 适用于 .NET 的 SDK
-
注意
查看 GitHub,了解更多信息。在无服务器示例
存储库中查找完整示例,并了解如何进行设置和运行。 通过 .NET 将 Kinesis 事件与 Lambda 结合使用。
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }
创建函数
-
为项目创建一个目录,然后切换到该目录。
mkdir kinesis-tutorial cd kinesis-tutorial
-
将示例 JavaScript 代码复制到名为
index.js
的新文件中。 -
创建部署程序包。
zip function.zip index.js
-
使用
create-function
命令创建 Lambda 函数。aws lambda create-function --function-name ProcessKinesisRecords \ --zip-file fileb://function.zip --handler index.handler --runtime nodejs18.x \ --role arn:aws:iam::
111122223333
:role/lambda-kinesis-role
测试 Lambda 函数
使用 invoke
AWS Lambda CLI 命令和示例 Kinesis 事件手动调用 Lambda 函数。
测试 Lambda 函数
-
将以下 JSON 复制到文件中并将其保存为
input.txt
。{ "Records": [ { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream" } ] }
-
使用
invoke
命令将事件发送到该函数。aws lambda invoke --function-name ProcessKinesisRecords \ --cli-binary-format raw-in-base64-out \ --payload file://input.txt outputfile.txt
如果使用 cli-binary-format 版本 2,则 AWS CLI 选项是必需的。要将其设为默认设置,请运行
aws configure set cli-binary-format raw-in-base64-out
。有关更多信息,请参阅版本 2 的 AWS Command Line Interface 用户指南中的 AWS CLI 支持的全局命令行选项。响应将保存到
out.txt
中。
创建 Kinesis 流
使用 create-stream
命令创建流。
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
运行下面的 describe-stream
命令以获取流 ARN。
aws kinesis describe-stream --stream-name lambda-stream
您应看到以下输出:
{ "StreamDescription": { "Shards": [ { "ShardId": "shardId-000000000000", "HashKeyRange": { "StartingHashKey": "0", "EndingHashKey": "340282366920746074317682119384634633455" }, "SequenceNumberRange": { "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610" } } ], "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream", "StreamName": "lambda-stream", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 24, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "KeyId": null, "StreamCreationTimestamp": 1544828156.0 } }
您将使用下一步中的流 ARN 来将该流关联到您的 Lambda 函数。
在 AWS Lambda 中添加事件源
运行以下 AWS CLI add-event-source
命令。
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \ --batch-size 100 --starting-position LATEST
记下映射 ID 以供将来使用。您可以通过运行 list-event-source-mappings
命令获取事件源映射的列表。
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \ --event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
在该响应中,您可以验证状态值是否为 enabled
。可以禁用事件源映射,以临时暂停轮询而不丢失任何记录。
测试设置
要测试事件源映射,请将事件记录添加到 Kinesis 流中。--data
值是一个字符串,CLI 先将其编码为 base64 字符串,然后才发送到 Kinesis。您可以多次运行同一命令来向流中添加多条记录。
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \ --data "Hello, this is a test."
Lambda 使用执行角色来读取来自流的记录。然后它将调用 Lambda 函数,批量传递记录。该函数解码每条记录中的数据并记录它,将输出发送到 CloudWatch Logs 中。在 CloudWatch 控制台
清除资源
除非您想要保留为本教程创建的资源,否则可立即将其删除。通过删除您不再使用的 AWS 资源,可防止您的 AWS 账户 产生不必要的费用。
删除执行角色
-
打开 IAM 控制台的角色页面
。 -
选择您创建的执行角色。
-
选择删除。
-
在文本输入字段中输入角色名称,然后选择 Delete(删除)。
删除 Lambda 函数
-
打开 Lamba 控制台的 Functions(函数)页面
。 -
选择您创建的函数。
-
依次选择操作和删除。
-
在文本输入字段中键入
confirm
,然后选择 Delete(删除)。
删除 Kinesis 流
-
登录到 AWS Management Console,然后通过以下网址打开 Kinesis 控制台:https://console.aws.amazon.com/kinesisvideo/home
。 -
选择您创建的流。
-
依次选择 Actions(操作)和 Delete(删除)。
-
在文本输入字段中输入
delete
。 -
选择 Delete(删除)。