將串流資料來源新增至 Apache Flink 的受管理服務 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將串流資料來源新增至 Apache Flink 的受管理服務

Apache Flink 提供了連接器,用於從檔案、通訊端、集合和自訂來源讀取資料。在應用程式的程式碼中,您可以使用 Apache Flink 來源接收來自串流的資料。本節說明可用於 Amazon 服務的來源

Kinesis 資料串流

FlinkKinesisConsumer 來源將來自 Amazon Kinesis 資料串流的串流資料提供給應用程式。

建立FlinkKinesisConsumer

以下程式碼範例示範如何建立 FlinkKinesisConsumer

Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); DataStream<string> input = env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

如需如何使用 FlinkKinesisConsumer 的詳細資訊,請參閱 下載並檢查阿帕奇 Flink 流 Java 代碼

建立使用增強型扇出 (EFO) 取用者的 FlinkKinesisConsumer

FlinkKinesisConsumer 現在支援增強型扇出 (EFO)

如果 Kinesis 取用者使用 EFO,Kinesis Data Streams 服務會提供專屬頻寬,而不是讓取用者與其他從串流讀取的取用者共用串流的固定頻寬。

如需將 EFO 與 Kinesis 消費者搭配使用的詳細資訊,請參閱 FLIP-128:適用於 Kinesis 動消費者的增強型扇出 AWS

您可以在 Kinesis 取用者上設定下列參數來啟用 EFO 取用者:

  • RECORD_PUBLISHER_TYPE:將此參數設定為 EFO,以便讓應用程式使用 EFO 取用者來存取 Kinesis 資料串流資料。

  • EFO_CONSUMER_NAME:將此參數設定為字串值,確保在此串流的取用者中保持唯一。在相同的 Kinesis 資料串流中重複使用取用者名稱,將導致先前使用該名稱的使用者遭到終止。

若要設定 FlinkKinesisConsumer 使用 EFO,請將下列參數新增至取用者:

consumerConfig.putIfAbsent(RECORD_PUBLISHER_TYPE, "EFO"); consumerConfig.putIfAbsent(EFO_CONSUMER_NAME, "basic-efo-flink-app");

如需使用 EFO 取用者之 Managed Service for Apache Flink 應用程式的範例,請參閱 增強型扇出 (EFO) 取用者

Amazon MSK

KafkaSource 來源將來自 Amazon MSK 主題的串流資料提供給應用程式。

建立KafkaSource

以下程式碼範例示範如何建立 KafkaSource

KafkaSource<String> source = KafkaSource.<String>builder() .setBootstrapServers(brokers) .setTopics("input-topic") .setGroupId("my-group") .setStartingOffsets(OffsetsInitializer.earliest()) .setValueOnlyDeserializer(new SimpleStringSchema()) .build(); env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

如需如何使用 KafkaSource 的詳細資訊,請參閱 MSK 複寫