Utilizzo di un cluster Delta Lake con Flink
A partire da Amazon EMR rilascio 6.11, puoi utilizzare Delta Lake con il cluster Flink. Negli esempi seguenti, viene utilizzata la AWS CLI per lavorare con Delta Lake su un cluster Flink di Amazon EMR.
Nota
Amazon EMR supporta l'API Flink DataStream quando usi Delta Lake con un cluster Flink.
Creazione di un cluster Delta Lake
-
Creare un file,
delta_configurations.json
, con i seguenti contenuti:[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
Crea un cluster con la seguente configurazione. Sostituisci il
example Amazon S3 bucket path
e l'subnet ID
con i valori effettivi.aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://DOC-EXAMPLE-BUCKET/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
Inizializzazione di una sessione Flink yarn
Per inizializzare una sessione Flink yarn, esegui il comando riportato di seguito:
flink-yarn-session -d
Compilazione di un processo Flink con Delta Lake
Gli esempi seguenti mostrano come usare sbt o Maven per compilare il processo Flink con Delta Lake.
Scrittura in una tabella Delta con l'API Flink Datastream
Usa l'esempio seguente per creare un DeltaSink per scrivere nella tabella con un deltaTablePath:
public static DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath, RowType rowType) { Configuration configuration = new Configuration(); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }
Lettura da una tabella Delta con l'API Flink Datastream
Usa l'esempio seguente per creare un DeltaSource limitato per leggere dalla tabella con un deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns( StreamExecutionEnvironment env, String deltaTablePath) { Configuration configuration = new Configuration(); DeltaSource<RowData> deltaSource = DeltaSource .forBoundedRowData( new org.apache.flink.core.fs.Path(
deltaTablePath
), configuration) .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); }
Creazione di sink con supporto multi-cluster per Delta Lake standalone
Usa l'esempio seguente per creare un DeltaSink per scrivere su una tabella con un deltaTablePath
e un supporto multi-cluster
public DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath) { Configuration configuration = new Configuration(); configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "
delta_log
"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
"); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }
Esecuzione del processo Flink
Per eseguire il processo, utilizza il comando seguente:
flink run FlinkJob.jar