Verwenden Sie einen Iceberg-Cluster mit Flink
Ab Amazon-EMR-Version 6.9.0 können Sie Iceberg mit einem Flink-Cluster verwenden, ohne dass die Einrichtungsschritte erforderlich sind, die bei der Verwendung der Open-Source-Iceberg-Flink-Integration erforderlich sind.
Erstellen Sie einen Iceberg-Cluster
Sie können einen Cluster mit installiertem Iceberg mithilfe der AWS Management Console, der AWS CLI oder der Amazon-EMR-API erstellen. In diesem Tutorial verwenden Sie AWS CLI, um mit Iceberg auf einem Amazon-EMR-Cluster zu arbeiten. Um die Konsole zur Erstellung eines Clusters mit Iceberg zu verwenden, folgen Sie den Schritten unter Ein Data Lake von Apache Iceberg mit Amazon Athena, Amazon EMR und AWS Glue erstellen
Um Iceberg in Amazon EMR mit AWS CLI zu verwenden, erstellen Sie zunächst einen Cluster mit den folgenden Schritten. Informationen zur Spezifizierung der Iceberg-Klassifizierung mithilfe von AWS CLI finden Sie unter Bei der Erstellung eines Clusters eine Konfiguration mit der AWS CLI bereitstellen oder Beim Erstellen eines Clusters eine Konfiguration mit dem Java SDK angeben. Erstellen Sie eine Datei mit dem Namen configurations.json
und dem folgenden Inhalt:
[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
Erstellen Sie als Nächstes einen Cluster mit der folgenden Konfiguration und ersetzen Sie den Amazon-S3-Bucket-Beispielpfad und die Subnetz-ID durch Ihre eigenen Werte:
aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
Alternativ können Sie einen Amazon-EMR-6.9.0-Cluster mit einer Flink-Anwendung darin erstellen und die Datei /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
als JAR-Abhängigkeit in einem Flink-Auftrag verwenden.
Verwenden des Flink-SQL-Clients
Das SQL-Client-Skript befindet sich unter /usr/lib/flink/bin
. Sie können das Skript mit dem folgenden Befehl ausführen:
flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh
Dadurch wird eine Flink-SQL-Shell gestartet.
Flink-Beispiele
Erstellen Sie eine Iceberg-Tabelle
Flink SQL
CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
Tabellen-API
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Schreiben Sie an einen Iceberg-Tabelle
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
Tabellen-API
tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
Datenstream-API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");
Lesen von einer Iceberg-Tabelle
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
Tabellen-API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
Datenstream-API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");
Verwenden des Hive-Katalogs
Stellen Sie sicher, dass die Abhängigkeiten zwischen Flink und Hive wie unter Konfigurieren Sie Flink mit Hive Metastore und Glue Catalog beschrieben aufgelöst wurden.
Flink-Auftrag ausführen
Eine Möglichkeit, einen Auftrag an Flink zu senden, besteht darin, eine Flink-YARN-Sitzung pro Auftrag zu verwenden. Dies kann mit dem folgenden Befehl gestartet werden:
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME