Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
向适用于 Apache Flink 的托管服务添加流数据源
Apache Flink 提供连接器以从文件、套接字、集合和自定义源中读取。在应用程序代码中,您可以使用 Apache Flink 源
Kinesis Data Streams
FlinkKinesisConsumer
源从 Amazon Kinesis 数据流中向应用程序提供流数据。
创建FlinkKinesisConsumer
以下代码示例说明了如何创建 FlinkKinesisConsumer
:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
有关使用 FlinkKinesisConsumer
的更多信息,请参阅下载并查看 Apache Flink 流式传输 Java 代码。
创建使用 EFO 消费端的FlinkKinesisConsumer
FlinkKinesisConsumer 现在支持增强型扇出 (EF
如果 Kinesis 使用者使用 EFO,则 Kinesis Data Streams 服务会为其提供自己的专用带宽,而不是让其与从流中读取数据的其他使用者共享流的固定带宽。
有关在 Kinesis 消费端上使用 EFO 的更多信息,请参阅 FLIP-128:Kinesis 消费者的增强型扇出 AWS
您可以通过在 Kinesis 使用者上设置以下参数来启用 EFO 使用者:
RECORD_PUBLISHER_TYPE:将此参数设置为 EFO,让您的应用程序使用 EFO 使用者访问 Kinesis 数据流数据。
EFO_CONSUMER_NAME:将此参数设置为该流使用者中的唯一字符串值。在同一 Kinesis 数据流中重复使用使用者名称,会导致之前使用该名称的使用者被终止。
要将 a 配置FlinkKinesisConsumer
为使用 EFO,请向消费端添加以下参数:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
有关使用 EFO 消费端的 Managed Service for Apache Flink 应用程序的示例,请参阅。EFO 使用者
Amazon MSK
KafkaSource
源从 Amazon MSK 主题向您的应用程序提供流数据。
创建KafkaSource
以下代码示例说明了如何创建 KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
有关使用 KafkaSource
的更多信息,请参阅MSK 复制。