Usa i connettori Table API - Servizio gestito per Apache Flink

Il servizio gestito da Amazon per Apache Flink era precedentemente noto come Analisi dei dati Amazon Kinesis per Apache Flink.

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Usa i connettori Table API

Nel modello di programmazione Apache Flink, i connettori sono componenti utilizzati dall'applicazione per leggere o scrivere dati da fonti esterne, come altri AWS servizi.

Con la tabella Apache FlinkAPI, è possibile utilizzare i seguenti tipi di connettori:

  • Sorgenti delle tabelle API: Utilizzate i connettori Table API Source per creare tabelle all'interno dell'utente TableEnvironment utilizzando API chiamate o SQL interrogazioni.

  • Lavandini da tavolo API: usi SQL i comandi per scrivere dati di tabella su fonti esterne come un MSK argomento Amazon o un bucket Amazon S3.

Sorgenti delle tabelle API

Crei un'origine di tabella da un flusso di dati. Il codice seguente crea una tabella da un MSK argomento di Amazon:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Per ulteriori informazioni sui sorgenti delle tabelle, consulta Table & SQL Connectors nella documentazione di Apache Flink.

Lavandini da tavolo API

Per scrivere i dati della tabella su un sink, create il sink in SQL e quindi eseguite il sink SQL basato sull'StreamTableEnvironmentoggetto.

Nell'esempio di codice seguente viene mostrato come scrivere dati di tabella su un sink Amazon S3:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Puoi utilizzare il parametro format per controllare il formato impiegato dal servizio gestito per Apache Flink per scrivere l'output nel sink. Per informazioni sui formati, consulta Connettori supportati nella documentazione di Apache Flink.

Sorgenti e sink definiti dall'utente

Puoi utilizzare i connettori Apache Kafka esistenti per inviare dati da e verso altri AWS servizi, come Amazon e Amazon MSK S3. Per interagire con altre origini e destinazioni dati, puoi definire fonti e sink personalizzati. Per ulteriori informazioni, consulta Sources and Sinks definiti dall'utente nella documentazione di Apache Flink.