Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Managed Service for Apache Flink の演算子を使用してデータを変換する DataStream API
Apache Flink 用 Managed Service の受信データを変換するには、Apache Flink「オペレータ」を使用します。Apache Flink オペレータは 1 つ以上のデータストリームを新しいデータストリームに変換します。新しいデータストリームには、元のデータストリームから変更されたデータが含まれます。Apache Flink には 25 種類以上のストリーム処理オペレータがあらかじめ組み込まれています。詳細については、「Apache Flink ドキュメント」の「オペレータ
このトピックには、次のセクションが含まれています。
変換演算子を使用する
以下は、JSONデータストリームのいずれかのフィールドでの単純なテキスト変換の例です。
このコードは変換されたデータストリームを作成します。新しいデータストリームは、元のストリームと同じデータを持ち、 TICKER
フィールドの内容に文字列 Company
が追加されます。
DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );
集計演算子を使用する
次は集約オペレータの例です。このコードは集約されたデータストリームを作成します。このオペレータは 5 秒間のタンブリングウィンドウを作成し、ウィンドウ内の同じ TICKER
値を持つレコードの PRICE
値の合計を返します。
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });
コード例の詳細については、「」を参照してくださいManaged Service for Apache Flink アプリケーションの作成と使用の例。