Creating Lambda Functions for Preprocessing - Amazon Kinesis Data Analytics for SQL Applications Developer Guide

For new projects, we recommend that you use the new Managed Service for Apache Flink Studio over Kinesis Data Analytics for SQL Applications. Managed Service for Apache Flink Studio combines ease of use with advanced analytical capabilities, enabling you to build sophisticated stream processing applications in minutes.

Creating Lambda Functions for Preprocessing

Your Amazon Kinesis Data Analytics application can use Lambda functions for preprocessing records as they are ingested into the application. Kinesis Data Analytics provides the following templates on the console to use as a starting point for preprocessing your data.

Creating a Preprocessing Lambda Function in Node.js

The following templates for creating preprocessing Lambda function in Node.js are available on the Kinesis Data Analytics console:

Lambda Blueprint Language and version Description
General Kinesis Data Analytics Input Processing Node.js 6.10

A Kinesis Data Analytics record preprocessor that receives JSON or CSV records as input and then returns them with a processing status. Use this processor as a starting point for custom transformation logic.

Compressed Input Processing Node.js 6.10 A Kinesis Data Analytics record processor that receives compressed (GZIP or Deflate compressed) JSON or CSV records as input and returns decompressed records with a processing status.

Creating a Preprocessing Lambda Function in Python

The following templates for creating preprocessing Lambda function in Python are available on the console:

Lambda Blueprint Language and version Description
General Kinesis Analytics Input Processing Python 2.7

A Kinesis Data Analytics record preprocessor that receives JSON or CSV records as input and then returns them with a processing status. Use this processor as a starting point for custom transformation logic.

KPL Input Processing Python 2.7 A Kinesis Data Analytics record processor that receives Kinesis Producer Library (KPL) aggregates of JSON or CSV records as input and returns disaggregated records with a processing status.

Creating a Preprocessing Lambda Function in Java

To create a Lambda function in Java for preprocessing records, use the Java events classes.

The following code demonstrates a sample Lambda function that preprocesses records using Java:

public class LambdaFunctionHandler implements RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent, KinesisAnalyticsInputPreprocessingResponse> { @Override public KinesisAnalyticsInputPreprocessingResponse handleRequest( KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) { context.getLogger().log("InvocatonId is : " + event.invocationId); context.getLogger().log("StreamArn is : " + event.streamArn); context.getLogger().log("ApplicationArn is : " + event.applicationArn); List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>(); KinesisAnalyticsInputPreprocessingResponse response = new KinesisAnalyticsInputPreprocessingResponse(records); event.records.stream().forEach(record -> { context.getLogger().log("recordId is : " + record.recordId); context.getLogger().log("record aat is :" + record.kinesisStreamRecordMetadata.approximateArrivalTimestamp); // Add your record.data pre-processing logic here. // response.records.add(new Record(record.recordId, KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>)); }); return response; } }

Creating a Preprocessing Lambda Function in .NET

To create a Lambda function in .NET for preprocessing records, use the .NET events classes.

The following code demonstrates a sample Lambda function that preprocesses records using C#:

public class Function { public KinesisAnalyticsInputPreprocessingResponse FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext context) { context.Logger.LogLine($"InvocationId: {evnt.InvocationId}"); context.Logger.LogLine($"StreamArn: {evnt.StreamArn}"); context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}"); var response = new KinesisAnalyticsInputPreprocessingResponse { Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>() }; foreach (var record in evnt.Records) { context.Logger.LogLine($"\tRecordId: {record.RecordId}"); context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}"); context.Logger.LogLine($"\tPartitionKey: {record.RecordMetadata.PartitionKey}"); context.Logger.LogLine($"\tRecord ApproximateArrivalTime: {record.RecordMetadata.ApproximateArrivalTimestamp}"); context.Logger.LogLine($"\tData: {record.DecodeData()}"); // Add your record preprocessig logic here. var preprocessedRecord = new KinesisAnalyticsInputPreprocessingResponse.Record { RecordId = record.RecordId, Result = KinesisAnalyticsInputPreprocessingResponse.OK }; preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant()); response.Records.Add(preprocessedRecord); } return response; } }

For more information about creating Lambda functions for preprocessing and destinations in .NET, see Amazon.Lambda.KinesisAnalyticsEvents.