Transformieren von Daten mithilfe von Operatoren in Managed Service für Apache Flink mit der API DataStream - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Transformieren von Daten mithilfe von Operatoren in Managed Service für Apache Flink mit der API DataStream

Um eingehende Daten in einem Managed Service für Apache Flink umzuwandeln, verwenden Sie einen Apache-Flink-Operator. Ein Apache-Flink-Operator wandelt einen oder mehrere Datenströme in einen neuen Datenstrom um. Der neue Datenstrom enthält modifizierte Daten aus dem ursprünglichen Datenstrom. Apache Flink bietet mehr als 25 vorgefertigte Operatoren zur Stream-Verarbeitung. Weitere Informationen finden Sie unter Operatoren in der Apache Flink-Dokumentation.

Dieses Thema enthält die folgenden Abschnitte:

Transformieren Sie Operatoren

Im Folgenden finden Sie ein Beispiel für eine einfache Texttransformation in einem der Felder eines JSON-Datenstroms.

Dieser Code erstellt einen transformierten Datenstrom. Der neue Datenstrom enthält dieselben Daten wie der ursprüngliche Stream, wobei die Zeichenfolge „ Company“ an den Inhalt des TICKER-Felds angehängt wird.

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

Aggregationsoperatoren

Es folgt ein Beispiel für einen Aggregationsoperator. Der Code erstellt einen aggregierten Datenstrom. Der Operator erstellt ein 5-sekündiges rollierendes Fenster und gibt die Summe der PRICE-Werte für die Datensätze im Fenster mit demselben TICKER-Wert zurück.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

Weitere Codebeispiele finden Sie unterBeispiele.