本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
將 Iceberg 叢集與 Flink 搭配使用
從 Amazon EMR 6.9.0 版開始,您可以將 Iceberg 與 Flink 叢集搭配使用,而無需使用開放原始碼 Iceberg Flink 整合時所需的設定步驟。
建立 Iceberg 叢集
您可以使用 AWS Management Console、 AWS CLI或 Amazon EMR API 建立已安裝 Iceberg 的叢集。在本教學課程中,您會使用 AWS CLI 在 Amazon EMR 叢集上使用 Iceberg。若要使用主控台建立已安裝 Iceberg 的叢集,請遵循使用 Amazon Athena、Amazon EMR 和 AWS Glue 建置 Apache Iceberg 資料湖
若要在 Amazon EMR 上使用 Iceberg AWS CLI,請先使用下列步驟建立叢集。如需使用 指定 Iceberg 分類的資訊 AWS CLI,請參閱 當您建立叢集 AWS CLI 時,使用 提供組態或 在建立叢集時使用 Java SDK 提供組態。建立稱為 configurations.json
的檔案,其中具有下列內容:
[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
接下來,使用下列組態建立叢集,並將範例 Amazon S3 儲存貯體路徑和子網路 ID 取代為您自己的值:
aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
或者,您可以建立其中具有 Flink 應用程式的 Amazon EMR 6.9.0 叢集,並將檔案 /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
作為 Flink 作業中的 JAR 依存項目。
使用 Flink SQL 用戶端
SQL 用戶端指令碼位於 /usr/lib/flink/bin
下。您可以使用下列命令執行指令碼:
flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh
這將啟動 Flink SQL Shell。
Flink 範例
建立 Iceberg 資料表
Flink SQL
CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
資料表 API
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
寫入 Iceberg 資料表
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
資料表 API
tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
資料串流 API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");
從 Iceberg 資料表讀取
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
資料表 API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
資料串流 API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");
使用 Hive 型錄
請確保如 將 Flink 與 Hive Metastore 和 Glue Catalog 搭配使用 中所述解析 Flink 和 Hive 依存項目。
執行 Flink 作業
向 Flink 提交作業的一種方法是使用每個作業的 Flink YARN 作業階段。這可以使用下列命令來啟動:
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
將 Iceberg 與 Flink 搭配使用的考量
-
使用 AWS Glue 做為 Iceberg 的目錄時,請確定您在其中建立資料表的資料庫存在於 Glue AWS 中。如果您使用的是 服務, AWS Lake Formation 但無法載入目錄,請確定您有權存取該服務來執行 命令。
Iceberg Glue 整合不適用於 Redshift 受管儲存目錄。