为应用程序目标创建 &LAM; 函数 - 适用于 Amazon Kinesis Data Analytics·for·SQL 应用程序开发人员指南

对于新项目,建议您使用新的适用于 Apache Flink Studio 的托管服务,而不是使用 Kinesis Data Analytics for SQL 应用程序。Managed Service for Apache Flink Studio 不仅操作简单,还具有高级分析功能,使您能够在几分钟内构建复杂的流处理应用程序。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

为应用程序目标创建 &LAM; 函数

您的 Kinesis Data Analytics 应用程序 AWS Lambda 可以使用函数作为输出。 提供了一些模板以创建用作应用程序目标的 函数。可以将这些模板作为应用程序输出后处理的起点。

使用 Node.js 创建 &LAM; 函数目标

在控制台上提供了以下模板以使用 Node.js 创建目标 函数:

&LAM; 作为输出蓝图 语言和版本 描述
kinesis-analytics-output Node.js 12.x 将输出记录从 应用程序传输到自定义目标。

使用 Python 创建 &LAM; 函数目标

在控制台上提供了以下模板以使用 Python 创建目标 函数:

&LAM; 作为输出蓝图 语言和版本 描述
kinesis-analytics-output-sns Python 2.7 将 Kinesis Data Analytics 应用程序的输出记录传送到 Amazon SNS。
kinesis-analytics-output-ddb Python 2.7 将 Kinesis Data Analytics 应用程序的输出记录传送到 Amazon DynamoDB。

使用 Java 创建 &LAM; 函数目标

要使用 Java 创建目标 &LAM; 函数,请使用 Java 事件类。

以下代码说明了一个使用 Java 的示例目标 &LAM; 函数:

public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> { @Override public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>(); KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint); // Add logic here to transform and send the record to final destination of your choice. response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok)); }); return response; } }

使用 .NET 创建 &LAM; 函数目标

要使用 .NET 创建目标 &LAM; 函数,请使用 .NET 事件类。

以下代码说明了一个使用 C# 的示例目标 &LAM; 函数:

public class Function { public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsOutputDeliveryResponse { Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add logic here to send to the record to final destination of your choice. var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsOutputDeliveryResponse.OK }; response.Records.Add(deliveredRecord); } return response; } }

有关使用 .NET 创建 Lambda 函数以进行预处理或作为目标的更多信息,请参阅Amazon.Lambda.KinesisAnalyticsEvents