Conectores API de mesa - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Conectores API de mesa

En el modelo de programación de Apache Flink, los conectores son componentes que la aplicación utiliza para leer o escribir datos de fuentes externas, como otros AWS servicios.

Con la API de tabla de Apache Flink, se pueden usar los siguientes tipos de conectores:

  • Tabla: fuentes de API: se utilizan los conectores fuente de la API de tabla para crear tablas dentro de su TableEnvironment mediante solicitudes a la API o, bien, consultas SQL.

  • Tabla: sumideros de API: se utilizan comandos SQL para escribir datos de tablas en fuentes externas, como un tema de Amazon MSK o un bucket de Amazon S3.

Tabla: fuentes de API

Se crea una fuente de tabla a partir de un flujo de datos. El siguiente código crea una tabla a partir de un tema de Amazon MSK:

//create the table final FlinkKafkaConsumer<StockRecord> consumer = new FlinkKafkaConsumer<StockRecord>(kafkaTopic, new KafkaEventDeserializationSchema(), kafkaProperties); consumer.setStartFromEarliest(); //Obtain stream DataStream<StockRecord> events = env.addSource(consumer); Table table = streamTableEnvironment.fromDataStream(events);

Para obtener más información sobre las fuentes de tablas, consulte los conectores de tabla y SQL en la documentación de Apache Flink.

Tabla: sumideros de API

Para escribir datos de tabla en un receptor, se crea el receptor en SQL y, a continuación, se ejecuta el receptor basado en SQL en el objeto StreamTableEnvironment.

En el siguiente código de ejemplo, se muestra cómo escribir datos de tablas a un receptor de Amazon S3:

final String s3Sink = "CREATE TABLE sink_table (" + "event_time TIMESTAMP," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ")" + " PARTITIONED BY (ticker,dt,hr)" + " WITH" + "(" + " 'connector' = 'filesystem'," + " 'path' = '" + s3Path + "'," + " 'format' = 'json'" + ") "; //send to s3 streamTableEnvironment.executeSql(s3Sink); filteredTable.executeInsert("sink_table");

Puede usar el parámetro format para controlar el formato que Managed Service para Apache Flink utiliza para escribir el resultado en el receptor. Para obtener información sobre los formatos, consulte los conectores compatibles en la documentación de Apache Flink.

Fuentes y sumideros definidos por el usuario

Puede usar los conectores Apache Kafka existentes para enviar datos a otros servicios de AWS desde estos, como Amazon MSK y Amazon S3. Para interactuar con otros orígenes y destinos de datos, puede definir sus propios orígenes y receptores. Para obtener más información, consulte Fuentes y receptores definidos por el usuario en la documentación de Apache Flink.