Connecteurs d'API de table - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Connecteurs d'API de table

Dans le modèle de programmation Apache Flink, les connecteurs sont des composants que votre application utilise pour lire ou écrire des données provenant de sources externes, telles que d'autres AWS services.

Avec l’API de table Apache Flink, vous pouvez utiliser les types de connecteurs suivants :

  • Sources d'API du tableau : vous utilisez les connecteurs source de l’API de table pour créer des tables dans votre TableEnvironment à l’aide d’appels d’API ou de requêtes SQL.

  • Réservoirs d'API de table : vous utilisez des commandes SQL pour écrire des données de table dans des sources externes telles qu’une rubrique Amazon MSK ou un compartiment Amazon S3.

Sources d'API du tableau

Vous créez une source de table à partir d’un flux de données. Le code suivant crée une table à partir d’une rubrique Amazon MSK :

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Pour plus d'informations sur les sources de tables, consultez la section Connecteurs de table et SQL dans la documentation d'Apache Flink.

Réservoirs d'API de table

Pour écrire des données de table dans un récepteur, vous créez le récepteur en SQL, puis vous exécutez le récepteur basé sur SQL sur l’objet StreamTableEnvironment.

L’exemple de code suivant illustre comment écrire des données de table sur un récepteur Amazon S3 :

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Vous pouvez utiliser le paramètre format pour contrôler le format utilisé par le service géré pour Apache Flink pour écrire la sortie sur le récepteur. Pour plus d'informations sur les formats, consultez la section Connecteurs pris en charge dans la documentation d'Apache Flink.

Sources et récepteurs définis par l'utilisateur

Vous pouvez utiliser les connecteurs Apache Kafka existants pour envoyer des données vers et depuis d’autres services AWS , tels qu’Amazon MSK et Amazon S3. Pour interagir avec d’autres sources de données et destinations, vous pouvez définir vos propres sources et récepteurs. Pour plus d'informations, consultez la section Sources et récepteurs définis par l'utilisateur dans la documentation d'Apache Flink.