Menulis data menggunakan sink di Managed Service untuk Apache Flink - Layanan Terkelola untuk Apache Flink

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menulis data menggunakan sink di Managed Service untuk Apache Flink

Dalam kode aplikasi Anda, Anda dapat menggunakan konektor sink Apache Flink untuk menulis ke sistem eksternal, termasuk AWS layanan, seperti Kinesis Data Streams dan DynamoDB.

Apache Flink juga menyediakan sink untuk file dan soket, dan Anda dapat menerapkan sink kustom. Di antara beberapa wastafel yang didukung, berikut ini sering digunakan:

Gunakan aliran data Kinesis

Apache Flink memberikan informasi tentang Konektor Kinesis Data Streams di dokumentasi Apache Flink.

Untuk contoh aplikasi yang menggunakan Kinesis data stream untuk input dan output, lihat Tutorial: Mulai menggunakan Layanan Terkelola DataStream API di Apache Flink.

Gunakan Apache Kafka dan Amazon Managed Streaming untuk Apache Kafka () MSK

Konektor Apache Flink Kafka memberikan dukungan ekstensif untuk mempublikasikan data ke Apache Kafka dan AmazonMSK, termasuk jaminan yang tepat sekali. Untuk mempelajari cara menulis ke Kafka, lihat contoh Konektor Kafka dalam dokumentasi Apache Flink.

Gunakan Amazon S3

Anda dapat menggunakan Apache Flink StreamingFileSink untuk menulis objek ke bucket Amazon S3.

Untuk contoh tentang cara menulis objek ke S3, lihat Contoh: Menulis ke ember Amazon S3.

Gunakan Firehose

FlinkKinesisFirehoseProducerIni adalah wastafel Apache Flink yang andal dan dapat diskalakan untuk menyimpan output aplikasi menggunakan layanan Firehose. Bagian ini menjelaskan cara menyiapkan proyek Maven untuk membuat dan menggunakan FlinkKinesisFirehoseProducer.

Buat FlinkKinesisFirehoseProducer

Contoh kode berikut mendemonstrasikan pembuatan FlinkKinesisFirehoseProducer:

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

Contoh Kode FlinkKinesisFirehoseProducer

Contoh kode berikut menunjukkan cara membuat dan mengkonfigurasi FlinkKinesisFirehoseProducer dan mengirim data dari aliran data Apache Flink ke layanan Firehose.

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

Untuk tutorial lengkap tentang cara menggunakan wastafel Firehose, lihat. Contoh: Menulis ke Firehose