表格 API 連接器 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

表格 API 連接器

在 Apache Flink 程式設計模型中,連接器是應用程式用來從外部來源 (例如其他 AWS 服務) 讀取或寫入資料的元件。

透過 Apache Flink 資料表 API,您可以使用下列類型的連接器:

  • 表格 API 來源:您可以使用資料表 API 來源連接器以及 API 呼叫或 SQL 查詢,在 TableEnvironment 中建立資料表。

  • 表格 API 接收器:您可以使用 SQL 命令將資料表資料寫入外部來源,例如 Amazon MSK 主題或 Amazon S3 儲存貯體。

表格 API 來源

您可以從資料串流建立資料表來源。下列程式碼會從 Amazon MSK 主題建立資料表:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

如需有關資料表來源的詳細資訊,請參閱 Apache Flink 文件中的資料表和 SQL 連接器

表格 API 接收器

若要將資料表資料寫入接收器,請在 SQL 中建立接收器,然後在 StreamTableEnvironment 物件上執行 SQL 型接收器。

下列程式碼範例示範如何將資料表資料寫入 Amazon S3 接收器:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

您可以使用 format 參數來控制 Managed Service for Apache Flink 用來將輸出寫入接收器的格式。如需格式的相關資訊,請參閱 Apache Flink 文件中的支援連接器

使用者定義來源和匯

您可以使用現有的 Apache Kafka 連接器與其他 AWS 服務 (例如 Amazon MSK 和 Amazon S3) 之間相互傳送資料。若要與其他資料來源和目的地互動,您可以定義自己的來源和接收器。如需詳細資訊,請參閱 Apache Flink 文件中的使用者定義來源和接收器