Schreiben Sie Daten mithilfe von Senken in Managed Service für Apache Flink - Managed Service für Apache Flink

Amazon Managed Service für Apache Flink war zuvor als Amazon Kinesis Data Analytics für Apache Flink bekannt.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schreiben Sie Daten mithilfe von Senken in Managed Service für Apache Flink

In Ihrem Anwendungscode können Sie jeden Apache Flink-Sink-Connector verwenden, um in externe Systeme zu schreiben, einschließlich AWS Dienste wie Kinesis Data Streams und DynamoDB.

Apache Flink bietet auch Senken für Dateien und Sockets, und Sie können benutzerdefinierte Senken implementieren. Unter den verschiedenen unterstützten Senken werden häufig die folgenden verwendet:

Verwenden Sie Kinesis-Datenstreams

Apache Flink bietet Informationen zum Kinesis Data Streams-Konnector in der Apache Flink-Dokumentation.

Ein Beispiel für eine Anwendung, die einen Kinesis Data Stream für Eingabe und Ausgabe verwendet, finden Sie unter Tutorial: Erste Schritte mit dem DataStream API integrierten Managed Service für Apache Flink.

Verwenden Sie Apache Kafka und Amazon Managed Streaming for Apache Kafka () MSK

Der Apache Flink Kafka Connector bietet umfassende Unterstützung für die Veröffentlichung von Daten auf Apache Kafka und AmazonMSK, einschließlich Exactly-Once-Garantien. Informationen zum Schreiben in Kafka finden Sie in den Beispielen für Kafka Connectors in der Apache Flink-Dokumentation.

Verwenden Sie Amazon S3

Sie können Apache Flink StreamingFileSink verwenden, um Objekte in einen Amazon S3-Bucket zu schreiben.

Ein Beispiel dafür, wie man Objekte in S3 schreibt, finden Sie unter Beispiel: In einen Amazon S3 S3-Bucket schreiben.

Benutze Firehose

Das FlinkKinesisFirehoseProducer ist eine zuverlässige, skalierbare Apache Flink-Senke zum Speichern von Anwendungsausgaben mithilfe des Firehose-Dienstes. In diesem Abschnitt wird die Einrichtung eines Maven-Projekts beschrieben, um einen FlinkKinesisFirehoseProducer zu erstellen und zu verwenden.

Erstellen eines FlinkKinesisFirehoseProducer

Das folgende Code-Beispiel zeigt das Erstellen eines FlinkKinesisFirehoseProducer:

Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);

FlinkKinesisFirehoseProducer-Codebeispiel

Das folgende Codebeispiel zeigt, wie Sie einen Apache Flink-Datenstream erstellen FlinkKinesisFirehoseProducer und konfigurieren und Daten von diesem an den Firehose senden.

package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants; import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties); ProducerConfigConstants config = new ProducerConfigConstants(); return sink; } private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException { /* * com.amazonaws.services.kinesisanalytics.flink.connectors.config. * ProducerConfigConstants * lists of all of the properties that firehose sink can be configured with. */ Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); // Kinesis Firehose sink input.addSink(createFirehoseSinkFromStaticConfig()); // If you would like to use runtime configuration properties, uncomment the // lines below // input.addSink(createFirehoseSinkFromApplicationProperties()); env.execute("Flink Streaming Java API Skeleton"); } }

Ein vollständiges Tutorial zur Verwendung der Firehose-Spüle finden Sie unterBeispiel: An Firehose schreiben.