쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

Flink와 함께 Iceberg 클러스터 사용 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Flink와 함께 Iceberg 클러스터 사용

Amazon EMR 버전 6.9.0부터 오픈 소스 Iceberg Flink 통합을 사용할 때 필수 설정 단계 없이도 Flink 클러스터에서 Iceberg를 사용할 수 있습니다.

Iceberg 클러스터 생성

AWS Management Console, AWS CLI또는 Amazon EMR API를 사용하여 Iceberg가 설치된 클러스터를 생성할 수 있습니다. 이 자습서에서는 AWS CLI 를 사용하여 Amazon EMR 클러스터에서 Iceberg로 작업합니다. 콘솔을 사용하여 Iceberg가 설치된 클러스터를 생성하려면 Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue의 단계를 수행합니다.

와 함께 Amazon EMR에서 Iceberg를 사용하려면 AWS CLI먼저 다음 단계를 사용하여 클러스터를 생성합니다. 를 사용하여 Iceberg 분류를 지정하는 방법에 대한 자세한 내용은 클러스터를 생성할 AWS CLI 때를 사용하여 구성 제공 또는 섹션을 AWS CLI참조하세요클러스터를 생성할 때 Java SDK를 사용하여 구성 제공. 다음 콘텐츠로 configurations.json이라는 파일을 생성합니다.

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

그리고 다음과 같은 구성으로 클러스터를 생성하고, 예제 Amazon S3 버킷 경로와 서브넷 ID를 사용자 값으로 대체합니다.

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

또는 Flink 애플리케이션을 포함하는 Amazon EMR 6.9.0 클러스터를 생성하고 /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar 파일을 Flink 작업에 JAR 종속성으로 포함할 수 있습니다.

SQL 클라이언트 스크립트는 /usr/lib/flink/bin에 있습니다. 다음 명령으로 스크립트를 실행할 수 있습니다.

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

그러면 Flink SQL 쉘이 시작됩니다.

Iceberg 테이블 생성

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-type'='glue' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

테이블 API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Iceberg 테이블에 쓰기

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

테이블 API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

데이터스트림 API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Iceberg 테이블에서 읽기

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

테이블 API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

데이터스트림 API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Hive 카탈로그 사용

Hive 메타스토어 및 Glue 카탈로그를 사용하여 Flink 구성에 설명된 대로 Flink 및 Hive 종속성이 해결되었는지 확인합니다.

Flink에 작업을 제출하는 한 가지 방법은 작업별 Flink YARN 세션을 사용하는 것입니다. 다음 명령을 사용하여 시작할 수 있습니다.

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
  • AWS Glue를 Iceberg의 카탈로그로 사용하는 경우 테이블을 생성하는 데이터베이스가 AWS Glue에 있는지 확인합니다. AWS Lake Formation 와 같은 서비스를 사용하고 있고 카탈로그를 로드할 수 없는 경우 명령을 실행하기 위해 서비스에 대한 적절한 액세스 권한이 있는지 확인합니다.

  • Iceberg Glue 통합은 Redshift Managed Storage 카탈로그에서 작동하지 않습니다.

프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.