기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Flink와 함께 Delta Lake 클러스터 사용
Amazon EMR 릴리스 6.11 이상에서는 Flink 클러스터와 함께 Delta Lake를 사용할 수 있습니다. 다음 예제에서는 AWS CLI 를 사용하여 Amazon EMR Flink 클러스터에서 Delta Lake로 작업합니다.
참고
Amazon은 Flink DataStream API 클러스터와 함께 Delta Lake를 사용할 때 Flink를 EMR 지원합니다.
Delta Lake 클러스터 생성
-
다음 콘텐츠가 포함된
delta_configurations.json
파일을 생성합니다.[{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
-
다음 구성을 사용하여 클러스터를 생성합니다.
example Amazon S3 bucket path
및subnet ID
를 사용자 정보로 바꿉니다.aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
Flink yarn 세션 초기화
Flink yarn 세션을 초기화하려면 다음 명령을 실행합니다.
flink-yarn-session -d
Delta Lake를 사용하여 Flink 작업 빌드
다음 예제에서는 Delta Lake에서 sbt 또는 Maven을 사용해 Flink 작업을 빌드하는 방법을 보여줍니다.
sbt
libraryDependencies ++= Seq(
"io.delta" %% "delta-flink" % deltaConnectorsVersion
% "provided",
"io.delta" %% "delta-standalone" % deltaConnectorsVersion
% "provided",
"org.apache.flink" %% "flink-clients" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-parquet" % flinkVersion
% "provided",
"org.apache.hadoop" % "hadoop-client" % hadoopVersion
% "provided",
"org.apache.flink" % "flink-table-common" % flinkVersion
% "provided",
"org.apache.flink" %% "flink-table-runtime" % flinkVersion
% "provided")
Flink Datastream을 사용하여 Delta 테이블에 쓰기 API
다음 예제를 사용하여 로 테이블에 쓸 DeltaSink 를 생성합니다. deltaTablePath:
public static DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath,
RowType rowType) {
Configuration configuration = new Configuration();
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new org.apache.flink.core.fs.Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Flink Datastream을 사용하여 델타 테이블에서 읽기 API
다음 예제를 사용하여를 사용하여 테이블에서 읽을 경계 DeltaSource 를 생성합니다. deltaTablePath:
public static DataStream<RowData> createBoundedDeltaSourceAllColumns(
StreamExecutionEnvironment env,
String deltaTablePath) {
Configuration configuration = new Configuration();
DeltaSource<RowData> deltaSource = DeltaSource
.forBoundedRowData(
new org.apache.flink.core.fs.Path(deltaTablePath
),
configuration)
.build();
return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source");
}
Delta Lake 독립 실행형에 대한 다중 클러스터 지원을 통해 싱크 생성
다음 예제를 사용하여 deltaTablePath
및 다중 클러스터 지원을
public DataStream<RowData> createDeltaSink(
DataStream<RowData> stream,
String deltaTablePath) {
Configuration configuration = new Configuration();
configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log
");
configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1
");
DeltaSink<RowData> deltaSink = DeltaSink
.forRowData(
new Path(deltaTablePath),
configuration,
rowType)
.build();
stream.sinkTo(deltaSink);
return stream;
}
Flink 작업 실행
다음 명령을 사용하여 작업을 실행합니다.
flink run FlinkJob.jar