Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verwenden Sie einen Delta Lake-Cluster mit Flink
Mit Amazon-EMR-Version 6.11 und höher können Sie Delta Lake mit Ihrem Flink-Cluster verwenden. In den folgenden Beispielen wird der verwendet AWS CLI , um mit Delta Lake auf einem Amazon EMR Flink-Cluster zu arbeiten.
Anmerkung
Amazon EMR unterstützt die DataStream Flink-API, wenn Sie Delta Lake mit einem Flink-Cluster verwenden.
Einen Delta-Lake-Cluster erstellen
-
Erstellen Sie eine Datei,
delta_configurations.json
, mit folgendem Inhalt:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
Erstellen Sie einen Cluster mit der folgenden Konfiguration. Ersetzen Sie
example Amazon S3 bucket path
undsubnet ID
durch Ihre eigenen Werte.aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
Eine Flink-Yarn-Sitzung installieren
Um eine Flink-Yarn-Sitzung zu initialisieren, führen Sie den folgenden Befehl aus:
flink-yarn-session -d
Einen Flink-Auftrag mit Delta Lake erstellen
Die folgenden Beispiele zeigen, wie Sie mit SBT oder Maven für die Erstellung Ihres Flink-Auftrags mit Delta Lake verwenden.
sbt
libraryDependencies ++= Seq(
"io.delta" %% "delta-flink" % deltaConnectorsVersion
% "provided",
"io.delta" %% "delta-standalone" % deltaConnectorsVersion
% "provided",
"org.apache.flink" %% "flink-clients" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion
% "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion
% "provided",
"org.apache.flink" % "flink-table-common" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-table-runtime" % flinkVersion
% "provided")
Mit der Flink-Datastream-API in eine Delta-Tabelle schreiben
Verwenden Sie das folgende Beispiel, um eine DeltaSink zu erstellen, mit der Sie in die Tabelle schreiben können deltaTablePath:
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
Configuration configuration = new Configuration();
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new org.apache.flink.core.fs.Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Mit der Flink-Datastream-API aus einer Delta-Tabelle lesen
Verwenden Sie das folgende Beispiel, um eine Grenze zu erstellen DeltaSource , die aus der Tabelle mit einem gelesen werden soll deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
Configuration configuration = new Configuration();
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new org.apache.flink.core.fs.Path(deltaTablePath
),
configuration)
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Sink-Erstellung mit Multi-Cluster-Unterstützung für Delta Lake Standalone
Verwenden Sie das folgende Beispiel, um eine Tabelle DeltaSink zum Schreiben in eine Tabelle mit A deltaTablePath
- und Multi-Cluster-Unterstützung
public DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath) {
Configuration configuration = new Configuration();
configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log
");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
");
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Den Flink-Auftrag ausführen
Den folgenden Befehl verwenden, um Ihren Job auszuführen:
flink run FlinkJob.jar