기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Flink와 함께 Iceberg 클러스터 사용
Amazon EMR 버전 6.9.0부터 오픈 소스 Iceberg Flink 통합을 사용할 때 필수 설정 단계 없이도 Flink 클러스터에서 Iceberg를 사용할 수 있습니다.
Iceberg 클러스터 생성
AWS Management Console, AWS CLI또는 Amazon EMR API를 사용하여 Iceberg가 설치된 클러스터를 생성할 수 있습니다. 이 자습서에서는 AWS CLI 를 사용하여 Amazon EMR 클러스터에서 Iceberg로 작업합니다. 콘솔을 사용하여 Iceberg가 설치된 클러스터를 생성하려면 Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue
와 함께 Amazon EMR에서 Iceberg를 사용하려면 AWS CLI먼저 다음 단계를 사용하여 클러스터를 생성합니다. 를 사용하여 Iceberg 분류를 지정하는 방법에 대한 자세한 내용은 클러스터를 생성할 AWS CLI 때를 사용하여 구성 제공 또는 섹션을 AWS CLI참조하세요클러스터를 생성할 때 Java SDK를 사용하여 구성 제공. 다음 콘텐츠로 configurations.json
이라는 파일을 생성합니다.
[{
"Classification":"iceberg-defaults",
"Properties":{"iceberg.enabled":"true"}
}]
그리고 다음과 같은 구성으로 클러스터를 생성하고, 예제 Amazon S3 버킷 경로와 서브넷 ID를 사용자 값으로 대체합니다.
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
또는 Flink 애플리케이션을 포함하는 Amazon EMR 6.9.0 클러스터를 생성하고 /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
파일을 Flink 작업에 JAR 종속성으로 포함할 수 있습니다.
Flink SQL 클라이언트 사용
SQL 클라이언트 스크립트는 /usr/lib/flink/bin
에 있습니다. 다음 명령으로 스크립트를 실행할 수 있습니다.
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
그러면 Flink SQL 쉘이 시작됩니다.
Flink 예제
Iceberg 테이블 생성
Flink SQL
CREATE CATALOG glue_catalog WITH (
'type'='iceberg',
'warehouse'='<WAREHOUSE>',
'catalog-type'='glue'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS <DB>;
USE <DB>;
CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
테이블 API
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
String warehouse = "<WAREHOUSE>";
String db = "<DB>";
tEnv.executeSql(
"CREATE CATALOG glue_catalog WITH (\n"
+ " 'type'='iceberg',\n"
+ " 'warehouse'='"
+ warehouse
+ "',\n"
+ " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
+ " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
+ " );");
tEnv.executeSql("USE CATALOG glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
"CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Iceberg 테이블에 쓰기
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
테이블 API
tEnv.executeSql(
"INSERT INTO `glue_catalog`.`"
+ db
+ "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
데이터스트림 API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));
DataStream<RowData> input = env.fromElements(rowData1);
Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStreamSink<Void> dataStreamSink =
FlinkSink.forRowData(input).tableLoader(tableLoader).append();
env.execute("Datastream Write");
Iceberg 테이블에서 읽기
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
테이블 API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
데이터스트림 API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStream<RowData> batch =
FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
batch.print().name("print-sink");
Hive 카탈로그 사용
Hive 메타스토어 및 Glue 카탈로그를 사용하여 Flink 구성에 설명된 대로 Flink 및 Hive 종속성이 해결되었는지 확인합니다.
Flink 작업 실행
Flink에 작업을 제출하는 한 가지 방법은 작업별 Flink YARN 세션을 사용하는 것입니다. 다음 명령을 사용하여 시작할 수 있습니다.
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
Flink에서 Iceberg 사용 시 고려 사항
-
AWS Glue를 Iceberg의 카탈로그로 사용하는 경우 테이블을 생성하는 데이터베이스가 AWS Glue에 있는지 확인합니다. AWS Lake Formation 와 같은 서비스를 사용하고 있고 카탈로그를 로드할 수 없는 경우 명령을 실행하기 위해 서비스에 대한 적절한 액세스 권한이 있는지 확인합니다.
Iceberg Glue 통합은 Redshift Managed Storage 카탈로그에서 작동하지 않습니다.