Use API conectores de mesa - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Use API conectores de mesa

No modelo de programação do Apache Flink, os conectores são componentes que seu aplicativo usa para ler ou gravar dados de fontes externas, como outros serviços. AWS

Com a tabela Apache FlinkAPI, você pode usar os seguintes tipos de conectores:

  • APIFontes da tabela: você usa conectores API de origem de tabela para criar tabelas dentro de você TableEnvironment usando API chamadas ou SQL consultas.

  • APIPias de mesa: Você usa SQL comandos para gravar dados de tabela em fontes externas, como um MSK tópico da Amazon ou um bucket do Amazon S3.

APIFontes da tabela

Você cria uma fonte de tabela a partir de um fluxo de dados. O código a seguir cria uma tabela a partir de um MSK tópico da Amazon:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Para obter mais informações sobre fontes de tabelas, consulte Tabela e SQL conectores na documentação do Apache Flink.

APIPias de mesa

Para gravar dados da tabela em um coletor, você cria o coletor eSQL, em seguida, executa o coletor SQL baseado no StreamTableEnvironment objeto.

O exemplo de código a seguir demonstra como gravar dados de tabela em um coletor do Amazon S3:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Você pode usar o parâmetro format para controlar qual formato o Managed Service for Apache Flink usa para gravar a saída no coletor. Para obter informações sobre formatos, consulte Conectores compatíveis na documentação do Apache Flink.

Fontes e coletores definidos pelo usuário

Você pode usar os conectores Apache Kafka existentes para enviar dados de e para outros AWS serviços, como Amazon e Amazon MSK S3. Para interagir com outras fontes de dados e destinos, você pode definir suas próprias fontes e coletores. Para obter mais informações, consulte Fontes e coletores definidos pelo usuário na documentação do Apache Flink.