在 Apache Flink 的托管服务中使用接收器写入数据 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Apache Flink 的托管服务中使用接收器写入数据

在您的应用程序代码中,您可以使用任何 Apache Flink 接收器连接器写入外部系统,包括 AWS 服务,例如 Kinesis Data Streams 和 DynamoDB。

Apache Flink 还为文件和套接字提供了接收器,你可以实现自定义接收器。在支持的几个接收器中,以下是经常使用的:

Kinesis Data Streams

Apache Flink 在 Apache Flink 文档中提供了有关 Kinesis Data Streams 连接器的信息。

有关使用 Kinesis 数据流进行输入和输出的应用程序示例,请参见。教程:开始使用 DataStream API

Apache Kafka 和适用于 Apache Kafka 的亚马逊托管流媒体 Kafka (MSK)

Apache Flink Kafka 连接器为向 Apache Kafka 和 Amazon MSK 发布数据提供了广泛支持,包括一次性保证。要了解如何写入 Kafka,请参阅 Apache Flink 文档中的 Kafka 连接器示例

Amazon S3

您可以使用 Apache Flink StreamingFileSink 以将对象写入到 Amazon S3 存储桶中。

有关如何将对象写入到 S3 的示例,请参阅示例:写入 Amazon S3 存储桶

Firehose

FlinkKinesisFirehoseProducer是一款可靠、可扩展的 Apache Flink 接收器,用于使用 Firehose 服务存储应用程序输出。本节介绍了如何设置 Maven 项目以创建和使用 FlinkKinesisFirehoseProducer

创建FlinkKinesisFirehoseProducer

以下代码示例说明了如何创建 FlinkKinesisFirehoseProducer

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer 代码示例

以下代码示例演示了如何创建和配置以及如何将数据从 Apache Flink 数据流发送到 Firehose 服务。FlinkKinesisFirehoseProducer

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

有关如何使用 Firehose 接收器的完整教程,请参阅。示例:写入 Firehose