Apache Flink용 매니지드 서비스에서 싱크를 사용하여 데이터 쓰기 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink는 이전에 Amazon Kinesis Data Analytics for Apache Flink로 알려졌습니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Flink용 매니지드 서비스에서 싱크를 사용하여 데이터 쓰기

애플리케이션 코드에서 모든 Apache Flink 싱크 커넥터를 사용하여 Kinesis Data Streams 및 DynamoDB와 같은 AWS 서비스를 포함한 외부 시스템에 쓸 수 있습니다.

또한 Apache Flink는 파일 및 소켓용 싱크를 제공하며, 사용자 지정 싱크를 구현할 수 있습니다. 지원되는 여러 싱크 중에서 자주 사용되는 싱크는 다음과 같습니다.

Kinesis Data Streams

Apache Flink는 Apache Flink 설명서에서 Kinesis Data Streams 커넥터에 대한 정보를 제공합니다.

입력 및 출력에 Kinesis 데이터 스트림을 사용하는 애플리케이션의 예는 시작하기 (API) DataStream 섹션을 참조하세요.

아파치 카프카 및 아마존 매니지드 스트리밍 (MSK) 용 아파치 카프카

Apache Flink Kafka 커넥터는 Apache Kafka 및 Amazon MSK에 데이터를 게시하기 위한 광범위한 지원을 제공하며, 여기에는 정확히 한 번만 적용되는 보장이 포함됩니다. Kafka에 쓰는 방법을 알아보려면 Apache Flink 설명서에서 Kafka 커넥터 예제를 참조하십시오.

Amazon S3

Amazon S3 버킷에 객체를 쓰는 데 Apache Flink StreamingFileSink를 사용할 수 있습니다.

S3에 객체를 쓰는 방법에 대한 예는 예: Amazon S3 버킷에 쓰기 섹션을 참조하세요.

Firehose

Firehose 서비스를 사용하여 애플리케이션 출력을 저장하기 위한 안정적이고 확장 가능한 Apache Flink FlinkKinesisFirehoseProducer 싱크입니다. 이 섹션에서는 Maven 프로젝트를 설정하여 FlinkKinesisFirehoseProducer를 생성하고 사용하는 방법을 설명합니다.

FlinkKinesisFirehoseProducer 생성

다음 코드 예는 FlinkKinesisFirehoseProducer를 생성하는 방법을 보여 줍니다.

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer 코드 예

다음 코드 예제는 Apache Flink 데이터 스트림을 만들고 구성한 FlinkKinesisFirehoseProducer 다음 Firehose 서비스로 데이터를 전송하는 방법을 보여줍니다.

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

Firehose 싱크 사용 방법에 대한 전체 자습서는 을 참조하십시오. 예: Firehose에 쓰기