在 Apache Flink 托管服务中使用运算符转换数据 DataStream API - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Apache Flink 托管服务中使用运算符转换数据 DataStream API

要在中转换传入数据,您可以使用 Apache Flink 运算符。Apache Flink 运算符将一个或多个数据流转换为新的数据流。新数据流包含来自原始数据流的修改的数据。Apache Flink 提供超过 25 个预构建的流处理运算符。有关更多信息,请参阅 Apache Flink 文档中的运算符

本主题包含下列部分:

使用变换运算符

以下是对JSON数据流的其中一个字段进行简单文本转换的示例。

该代码创建转换的数据流。新数据流具有与原始流相同的数据,并在 TICKER 字段内容后面附加“ Company”字符串。

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

使用聚合运算符

以下是一个聚合运算符示例。该代码创建聚合的数据流。该运算符创建一个 5 秒的滚动窗口,并返回窗口中具有相同 TICKER 值的记录的 PRICE 值之和。

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

有关更多代码示例,请参阅创建和使用适用于 Apache Flink 应用程序的托管服务的示例