Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。
本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將串流資料來源新增至 Apache Flink 的受管理服務
Apache Flink 提供了連接器,用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中,您可以使用 Apache Flink 來源
Kinesis 資料串流
此 FlinkKinesisConsumer
來源將來自 Amazon Kinesis 資料串流的串流資料提供給應用程式。
建立FlinkKinesisConsumer
以下程式碼範例示範如何建立 FlinkKinesisConsumer
:
Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
如需如何使用 FlinkKinesisConsumer
的詳細資訊,請參閱 下載並檢查阿帕奇 Flink 流 Java 代碼。
建立使用增強型扇出 (EFO) 取用者的 FlinkKinesisConsumer
FlinkKinesisConsumer 現在支援增強型扇出 (EFO)
如果 Kinesis 取用者使用 EFO,Kinesis Data Streams 服務會提供專屬頻寬,而不是讓取用者與其他從串流讀取的取用者共用串流的固定頻寬。
如需將 EFO 與 Kinesis 消費者搭配使用的詳細資訊,請參閱 FLIP-128:適用於 Kinesis 動消費者的增強型扇出 AWS
您可以在 Kinesis 取用者上設定下列參數來啟用 EFO 取用者:
RECORD_PUBLISHER_TYPE:將此參數設定為 EFO,以便讓應用程式使用 EFO 取用者來存取 Kinesis 資料串流資料。
EFO_CONSUMER_NAME:將此參數設定為字串值,確保在此串流的取用者中保持唯一。在相同的 Kinesis 資料串流中重複使用取用者名稱,將導致先前使用該名稱的使用者遭到終止。
若要設定 FlinkKinesisConsumer
使用 EFO,請將下列參數新增至取用者:
consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");
如需使用 EFO 取用者之 Managed Service for Apache Flink 應用程式的範例,請參閱 增強型扇出 (EFO) 取用者。
Amazon MSK
此 KafkaSource
來源將來自 Amazon MSK 主題的串流資料提供給應用程式。
建立KafkaSource
以下程式碼範例示範如何建立 KafkaSource
:
KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
如需如何使用 KafkaSource
的詳細資訊,請參閱 MSK 複寫。