Flink와 함께 Iceberg 클러스터 사용 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Flink와 함께 Iceberg 클러스터 사용

Amazon EMR 버전 6.9.0부터는 오픈 소스 Iceberg Flink 통합을 사용할 때 필요한 설정 단계 없이 Flink 클러스터와 함께 Iceberg를 사용할 수 있습니다.

Iceberg 클러스터 생성

AWS Management Console, AWS CLI또는 Amazon 를 사용하여 Iceberg가 설치된 클러스터를 생성할 수 있습니다EMRAPI. 이 자습서에서는 AWS CLI 를 사용하여 Amazon EMR 클러스터에서 Iceberg로 작업합니다. 콘솔을 사용하여 Iceberg가 설치된 클러스터를 생성하려면 Amazon Amazon Athena EMR및 AWS Glue를 사용하여 Apache Iceberg 데이터 레이크 빌드의 단계를 따릅니다.

EMR 에서 Amazon의 Iceberg를 사용하려면 AWS CLI먼저 다음 단계를 사용하여 클러스터를 생성합니다. 를 사용하여 Iceberg 분류를 지정하는 방법에 대한 자세한 내용은 클러스터를 생성할 AWS CLI 때 를 사용하여 구성 제공 또는 섹션을 AWS CLI참조하세요클러스터를 생성할 SDK 때 Java를 사용하여 구성 제공. 다음 콘텐츠로 configurations.json이라는 파일을 생성합니다.

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

그리고 다음과 같은 구성으로 클러스터를 생성하고, 예제 Amazon S3 버킷 경로와 서브넷 ID를 사용자 값으로 대체합니다.

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

또는 Flink 애플리케이션이 포함된 Amazon EMR 6.9.0 클러스터를 생성하고 파일을 Flink 작업의 JAR 종속성/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar으로 사용할 수 있습니다.

SQL 클라이언트 스크립트는 아래에 있습니다/usr/lib/flink/bin. 다음 명령으로 스크립트를 실행할 수 있습니다.

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

이렇게 하면 Flink SQL 쉘이 시작됩니다.

Iceberg 테이블 생성

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.dynamodb.DynamoDbLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

테이블 API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Iceberg 테이블에 쓰기

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

테이블 API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

Datastream API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Iceberg 테이블에서 읽기

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

테이블 API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

Datastream API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Hive 카탈로그 사용

Hive 메타스토어 및 Glue 카탈로그를 사용하여 Flink 구성에 설명된 대로 Flink 및 Hive 종속성이 해결되었는지 확인합니다.

Flink에 작업을 제출하는 한 가지 방법은 작업별 Flink YARN 세션을 사용하는 것입니다. 다음 명령을 사용하여 시작할 수 있습니다.

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME