Utilizzo di un cluster Delta Lake con Flink - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di un cluster Delta Lake con Flink

A partire da Amazon EMR rilascio 6.11, puoi utilizzare Delta Lake con il cluster Flink. Gli esempi seguenti lo utilizzano AWS CLI per lavorare con Delta Lake su un cluster Amazon EMR Flink.

Nota

Amazon EMR supporta l' DataStream API Flink quando usi Delta Lake con un cluster Flink.

Creazione di un cluster Delta Lake

  1. Creare un file, delta_configurations.json, con i seguenti contenuti:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
  2. Crea un cluster con la seguente configurazione. Sostituisci il example Amazon S3 bucket path e l'subnet ID con i valori effettivi.

    aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

Per inizializzare una sessione Flink yarn, esegui il comando riportato di seguito:

flink-yarn-session -d

Gli esempi seguenti mostrano come usare sbt o Maven per compilare il processo Flink con Delta Lake.

sbt

sbt è uno strumento di compilazione per Scala che puoi usare con poca o nessuna configurazione quando disponi di piccoli progetti.

libraryDependencies ++= Seq( "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided", "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided", "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided", "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.flink" % "flink-table-common" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
Maven

Maven è uno strumento di automazione della compilazione open-source della Apache Software Foundation. Con Maven, puoi compilare, pubblicare e distribuire un processo Flink con Delta Lake su Amazon EMR.

<project> <properties> <scala.main.version>2.12</scala.main.version> <delta-connectors-version>0.6.0</delta-connectors-version> <flink-version>1.16.1</flink-version> <hadoop-version>3.1.0</hadoop-version> </properties> <dependencies> <dependency> <groupId>io.delta</groupId> <artifactId>delta-flink</artifactId> <version>$delta-connectors-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-standalone_$scala-main-version</artifactId> <version>$delta-connectors-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>$hadoop-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> </dependencies>

Usa l'esempio seguente per creare un oggetto DeltaSink da scrivere sulla tabella con un deltaTablePath:

public static DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath, RowType rowType) { Configuration configuration = new Configuration(); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }

Utilizzate l'esempio seguente per creare un valore limitato DeltaSource da leggere dalla tabella con un deltaTablePath:

public static DataStream<RowData> createBoundedDeltaSourceAllColumns( StreamExecutionEnvironment env, String deltaTablePath) { Configuration configuration = new Configuration(); DeltaSource<RowData> deltaSource = DeltaSource .forBoundedRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration) .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); }

Creazione di sink con supporto multi-cluster per Delta Lake standalone

Usa l'esempio seguente per DeltaSink creare una tabella su cui scrivere con supporto a deltaTablePath e multicluster:

public DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath) { Configuration configuration = new Configuration(); configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1"); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }

Per eseguire il processo, utilizza il comando seguente:

flink run FlinkJob.jar