Verwenden Sie Table-Konnektoren API - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden Sie Table-Konnektoren API

Im Apache Flink-Programmiermodell sind Konnektoren Komponenten, die Ihre Anwendung verwendet, um Daten aus externen Quellen, z. B. anderen AWS Diensten, zu lesen oder zu schreiben.

Mit der Apache Flink Table API können Sie die folgenden Arten von Konnektoren verwenden:

  • APITabellenquellen: Sie verwenden API Tabellenquellkonnektoren, um Tabellen in Ihrem System zu erstellen, TableEnvironment indem Sie entweder API Aufrufe oder SQL Abfragen verwenden.

  • Der Tisch sinkt API: Sie verwenden SQL Befehle, um Tabellendaten in externe Quellen wie ein MSK Amazon-Thema oder einen Amazon S3-Bucket zu schreiben.

APITabellenquellen

Sie erstellen eine Tabellenquelle aus einem Datenstrom. Der folgende Code erstellt eine Tabelle aus einem MSK Amazon-Thema:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Weitere Informationen zu Tabellenquellen finden Sie unter Table & SQL Connectors in der Apache Flink-Dokumentation.

Der Tisch sinkt API

Um Tabellendaten in eine Senke zu schreiben, erstellen Sie die Senke in SQL und führen dann die SQL basierte Senke für das StreamTableEnvironment Objekt aus.

Das folgende Codebeispiel zeigt, wie Tabellendaten in eine Amazon-S3-Senke geschrieben werden:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Sie können den format-Parameter verwenden, um zu steuern, welches Format Managed Service für Apache Flink verwendet, um die Ausgabe in die Senke zu schreiben. Informationen zu Formaten finden Sie in der Apache Flink-Dokumentation unter Unterstützte Konnektoren.

Benutzerdefinierte Quellen und Senken

Sie können vorhandene Apache Kafka-Konnektoren verwenden, um Daten zu und von anderen AWS Diensten wie Amazon MSK und Amazon S3 zu senden. Für die Interaktion mit anderen Datenquellen und -zielen können Sie Ihre eigenen Quellen und Senken definieren. Weitere Informationen finden Sie unter Benutzerdefinierte Quellen und Senken in der Apache Flink-Dokumentation.