Transforming Data Using Operators in Kinesis Data Analytics for Apache Flink With the DataStream API - Amazon Kinesis Data Analytics

Transforming Data Using Operators in Kinesis Data Analytics for Apache Flink With the DataStream API

To transform incoming data in a Kinesis Data Analytics for Apache Flink application, you use an Apache Flink operator. An Apache Flink operator transforms one or more data streams into a new data stream. The new data stream contains modified data from the original data stream. Apache Flink provides more than 25 pre-built stream processing operators. For more information, see Operators in the Apache Flink Documentation.

This topic contains the following sections:

Transform Operators

The following is an example of a simple text transformation on one of the fields of a JSON data stream.

This code creates a transformed data stream. The new data stream has the same data as the original stream, with the string " Company" appended to the contents of the TICKER field.

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

Aggregation Operators

The following is an example of an aggregation operator. The code creates an aggregated data stream. The operator creates a 5-second tumbling window and returns the sum of the PRICE values for the records in the window with the same TICKER value.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

For a complete code example that uses operators, see Getting Started (DataStream API). Source code for the Getting Started application is available at Getting Started in the Kinesis Data Analytics Java Examples GitHub repository.