為應用程式目的地建立 Lambda 函數 - 亞馬遜 Kinesis SQL 應用程式資料分析開發人員指南

對於新專案,我們建議您使用適用於 Apache Flink Studio 的全新受管理服務,取代適用於應用程式的 Kinesis Data Analytics。SQLManaged Service for Apache Flink Studio 易於使用且具備進階分析功能,讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

為應用程式目的地建立 Lambda 函數

您的 Kinesis Data Analytics 應用程式可以使用 AWS Lambda 函數做為輸出。Kinesis Data Analytics 提供範本,可讓您建立 Lambda 函數作為應用程式的目的地使用。使用這些範本做為應用程式後續處理輸出的起點。

在 Node.js 中建立一個 Lambda 函數目的地

主控台提供在 Node.js 中建立目的地 Lambda 函數的下列範本:

Lambda 作為輸出藍圖 語言與版本 描述
kinesis-analytics-output Node.js 12.x 將輸出記錄從 Kinesis Data Analytics 應用程式傳送至自訂目的地。

在 Python 中建立一個 Lambda 函數目的地

主控台提供在 Python 中建立目的地 Lambda 函數的下列範本:

Lambda 作為輸出藍圖 語言與版本 描述
kinesis-analytics-output-sns Python 2.7 將輸出記錄從 Kinesis Data Analytics 應用程式傳送至 Amazon SNS。
kinesis-analytics-output-ddb Python 2.7 將輸出記錄從 Kinesis Data Analytics 應用程式傳送至 Amazon DynamoDB。

在 Java 中建立一個 Lambda 函數目的地

如要在 Java 中建立目的地 Lambda 函數,請使用 Java 事件類別。

下列程式碼會示範使用 Java 的範例目的地 Lambda 函數:

public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent, KinesisAnalyticsOutputDeliveryResponse> { @Override public KinesisAnalyticsOutputDeliveryResponse handleRequest(KinesisAnalyticsOutputDeliveryEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>(); KinesisAnalyticsOutputDeliveryResponse response = new KinesisAnalyticsOutputDeliveryResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record retryHint is :" + record.lambdaDeliveryRecordMetadata.retryHint); // Add logic here to transform and send the record to final destination of your choice. response.records.add(new Record(record.recordId, KinesisAnalyticsOutputDeliveryResponse.Result.Ok)); }); return response; } }

在 .NET 中建立一個 Lambda 函數目的地

如要在 .NET 中建立目的地 Lambda 函數,請使用 .NET 事件類別。

下列程式碼會示範使用 C# 的範例目的地 Lambda 函數:

public class Function { public KinesisAnalyticsOutputDeliveryResponse FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsOutputDeliveryResponse { Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add logic here to send to the record to final destination of your choice. var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsOutputDeliveryResponse.OK }; response.Records.Add(deliveredRecord); } return response; } }

如需在 .NET 中建立用於預先處理和目的地的 Lambda 函數詳細資訊,請參閱 Amazon.Lambda.KinesisAnalyticsEvents