쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

Flink와 함께 Delta Lake 클러스터 사용 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Flink와 함께 Delta Lake 클러스터 사용

Amazon EMR 릴리스 6.11 이상에서는 Flink 클러스터와 함께 Delta Lake를 사용할 수 있습니다. 다음 예제에서는 AWS CLI 를 사용하여 Amazon EMR Flink 클러스터에서 Delta Lake로 작업합니다.

참고

Amazon은 Flink DataStream API 클러스터와 함께 Delta Lake를 사용할 때 Flink를 EMR 지원합니다.

Delta Lake 클러스터 생성

  1. 다음 콘텐츠가 포함된 delta_configurations.json 파일을 생성합니다.

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}]
  2. 다음 구성을 사용하여 클러스터를 생성합니다. example Amazon S3 bucket pathsubnet ID를 사용자 정보로 바꿉니다.

    aws emr create-cluster --release-label emr-6.11.0 --applications Name=Flink --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://amzn-s3-demo-bucket/ --instance-type m5.xlarge --instance-count 3 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

Flink yarn 세션을 초기화하려면 다음 명령을 실행합니다.

flink-yarn-session -d

다음 예제에서는 Delta Lake에서 sbt 또는 Maven을 사용해 Flink 작업을 빌드하는 방법을 보여줍니다.

sbt

sbt는 소규모 프로젝트의 경우 거의 또는 전혀 구성하지 않고도 사용할 수 있는 Scala용 빌드 도구입니다.

libraryDependencies ++= Seq( "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided", "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided", "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided", "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.flink" % "flink-table-common" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")
Maven

Maven은 Apache Software Foundation의 오픈 소스 빌드 자동화 도구입니다. Maven을 사용하면 Amazon에서 Delta Lake로 Flink 작업을 빌드, 게시 및 배포할 수 있습니다EMR.

<project> <properties> <scala.main.version>2.12</scala.main.version> <delta-connectors-version>0.6.0</delta-connectors-version> <flink-version>1.16.1</flink-version> <hadoop-version>3.1.0</hadoop-version> </properties> <dependencies> <dependency> <groupId>io.delta</groupId> <artifactId>delta-flink</artifactId> <version>$delta-connectors-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>io.delta</groupId> <artifactId>delta-standalone_$scala-main-version</artifactId> <version>$delta-connectors-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-parquet</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>$hadoop-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-common</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>$flink-version</version> <scope>provided</scope> </dependency> </dependencies>

sbt는 소규모 프로젝트의 경우 거의 또는 전혀 구성하지 않고도 사용할 수 있는 Scala용 빌드 도구입니다.

libraryDependencies ++= Seq( "io.delta" %% "delta-flink" % deltaConnectorsVersion % "provided", "io.delta" %% "delta-standalone" % deltaConnectorsVersion % "provided", "org.apache.flink" %% "flink-clients" % flinkVersion % "provided", "org.apache.flink" %% "flink-parquet" % flinkVersion % "provided", "org.apache.hadoop" % "hadoop-client" % hadoopVersion % "provided", "org.apache.flink" % "flink-table-common" % flinkVersion % "provided", "org.apache.flink" %% "flink-table-runtime" % flinkVersion % "provided")

다음 예제를 사용하여 로 테이블에 쓸 DeltaSink 를 생성합니다. deltaTablePath:

public static DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath, RowType rowType) { Configuration configuration = new Configuration(); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }

다음 예제를 사용하여를 사용하여 테이블에서 읽을 경계 DeltaSource 를 생성합니다. deltaTablePath:

public static DataStream<RowData> createBoundedDeltaSourceAllColumns( StreamExecutionEnvironment env, String deltaTablePath) { Configuration configuration = new Configuration(); DeltaSource<RowData> deltaSource = DeltaSource .forBoundedRowData( new org.apache.flink.core.fs.Path(deltaTablePath), configuration) .build(); return env.fromSource(deltaSource, WatermarkStrategy.noWatermarks(), "delta-source"); }

Delta Lake 독립 실행형에 대한 다중 클러스터 지원을 통해 싱크 생성

다음 예제를 사용하여 deltaTablePath다중 클러스터 지원을 사용하여 테이블에 DeltaSink 쓸를 생성합니다.

public DataStream<RowData> createDeltaSink( DataStream<RowData> stream, String deltaTablePath) { Configuration configuration = new Configuration(); configuration.set("spark.delta.logStore.s3.impl", "io.delta.storage.S3DynamoDBLogStore"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.tableName", "delta_log"); configuration.set("spark.io.delta.storage.S3DynamoDBLogStore.ddb.region", "us-east-1"); DeltaSink<RowData> deltaSink = DeltaSink .forRowData( new Path(deltaTablePath), configuration, rowType) .build(); stream.sinkTo(deltaSink); return stream; }

다음 명령을 사용하여 작업을 실행합니다.

flink run FlinkJob.jar
프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.