Trasformazione dei dati utilizzando gli operatori in Managed Service for Apache Flink con l'API DataStream - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Trasformazione dei dati utilizzando gli operatori in Managed Service for Apache Flink con l'API DataStream

Per trasformare i dati in entrata in un servizio gestito per Apache Flink viene utilizzato un operatore Apache Flink. Un operatore Apache Flink trasforma uno o più flussi di dati in un nuovo flusso di dati. Il nuovo flusso di dati contiene dati modificati dal flusso di dati originale. Apache Flink offre più di 25 operatori di elaborazione di flussi predefiniti. Per ulteriori informazioni, consulta Operatori nella documentazione di Apache Flink.

Questo argomento contiene le sezioni seguenti:

Operatori di trasformazione

Di seguito è riportato un esempio di semplice trasformazione del testo su uno dei campi di un flusso di dati JSON.

Questo codice crea un flusso di dati trasformato. Il nuovo flusso di dati contiene gli stessi dati del flusso originale, con la stringa " Company" aggiunta al contenuto del campo TICKER.

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

Operatori di aggregazione

Di seguito è riportato un esempio di operatore di aggregazione. Il codice crea un flusso di dati aggregato. L'operatore crea una finestra a cascata di 5 secondi e restituisce la somma dei valori PRICE per i record nella finestra con lo stesso valore TICKER.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

Per altri esempi di codice, consultaEsempi.