API와 함께 Apache Flink용 관리 서비스에서 연산자를 사용하여 데이터 변환 DataStream - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

API와 함께 Apache Flink용 관리 서비스에서 연산자를 사용하여 데이터 변환 DataStream

Managed Service for Apache Flink에서 들어오는 데이터를 변환하려면 Apache Flink 연산자를 사용합니다. Apache Flink 연산자는 하나 이상의 데이터 스트림을 새 데이터 스트림으로 변환합니다. 새 데이터 스트림에는 원래 데이터 스트림에서 수정된 데이터가 포함됩니다. Apache Flink는 25개 이상의 사전 빌드된 스트림 처리 연산자를 제공합니다. 자세한 내용은 Apache Flink 설명서의 연산자를 참조하십시오.

이 주제는 다음 섹션을 포함하고 있습니다:

변환 연산자

다음은 JSON 데이터 스트림의 필드 중 하나에 대한 간단한 텍스트 변환의 예입니다.

이 코드는 변환된 데이터 스트림을 만듭니다. 새 데이터 스트림에는 원래 스트림과 동일한 데이터가 포함되며 TICKER 필드 내용에 “ Company” 문자열이 추가됩니다.

DataStream<ObjectNode> output = input.map( new MapFunction<ObjectNode, ObjectNode>() { @Override public ObjectNode map(ObjectNode value) throws Exception { return value.put("TICKER", value.get("TICKER").asText() + " Company"); } } );

집계 연산자

다음은 집계 연산자의 예시입니다. 코드는 집계된 데이터 스트림을 만듭니다. 연산자는 5초짜리 텀블링 윈도우를 만들고 창에 있는 레코드에 대한 PRICE 값의 합계를 같은 TICKER 값으로 반환합니다.

DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText()) .window(TumblingProcessingTimeWindows.of(Time.seconds(5))) .reduce((node1, node2) -> { double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble(); node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal)); return node1; });

코드 예제에 대한 자세한 내용은 을 참조하십시오예제.