Verwenden Sie einen Iceberg-Cluster mit Flink - Amazon EMR

Verwenden Sie einen Iceberg-Cluster mit Flink

Ab Amazon-EMR-Version 6.9.0 können Sie Iceberg mit einem Flink-Cluster verwenden, ohne dass die Einrichtungsschritte erforderlich sind, die bei der Verwendung der Open-Source-Iceberg-Flink-Integration erforderlich sind.

Erstellen Sie einen Iceberg-Cluster

Sie können einen Cluster mit installiertem Iceberg mithilfe der AWS Management Console, der AWS CLI oder der Amazon-EMR-API erstellen. In diesem Tutorial verwenden Sie AWS CLI, um mit Iceberg auf einem Amazon-EMR-Cluster zu arbeiten. Um die Konsole zur Erstellung eines Clusters mit Iceberg zu verwenden, folgen Sie den Schritten unter Ein Data Lake von Apache Iceberg mit Amazon Athena, Amazon EMR und AWS Glue erstellen.

Um Iceberg in Amazon EMR mit AWS CLI zu verwenden, erstellen Sie zunächst einen Cluster mit den folgenden Schritten. Informationen zur Spezifizierung der Iceberg-Klassifizierung mithilfe von AWS CLI finden Sie unter Bei der Erstellung eines Clusters eine Konfiguration mit der AWS CLI bereitstellen oder Beim Erstellen eines Clusters eine Konfiguration mit dem Java SDK angeben. Erstellen Sie eine Datei mit dem Namen configurations.json und dem folgenden Inhalt:

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

Erstellen Sie als Nächstes einen Cluster mit der folgenden Konfiguration und ersetzen Sie den Amazon-S3-Bucket-Beispielpfad und die Subnetz-ID durch Ihre eigenen Werte:

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

Alternativ können Sie einen Amazon-EMR-6.9.0-Cluster mit einer Flink-Anwendung darin erstellen und die Datei /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar als JAR-Abhängigkeit in einem Flink-Auftrag verwenden.

Das SQL-Client-Skript befindet sich unter /usr/lib/flink/bin. Sie können das Skript mit dem folgenden Befehl ausführen:

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

Dadurch wird eine Flink-SQL-Shell gestartet.

Erstellen Sie eine Iceberg-Tabelle

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

Tabellen-API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Schreiben Sie an einen Iceberg-Tabelle

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

Tabellen-API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

Datenstream-API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Lesen von einer Iceberg-Tabelle

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

Tabellen-API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

Datenstream-API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Verwenden des Hive-Katalogs

Stellen Sie sicher, dass die Abhängigkeiten zwischen Flink und Hive wie unter Konfigurieren Sie Flink mit Hive Metastore und Glue Catalog beschrieben aufgelöst wurden.

Eine Möglichkeit, einen Auftrag an Flink zu senden, besteht darin, eine Flink-YARN-Sitzung pro Auftrag zu verwenden. Dies kann mit dem folgenden Befehl gestartet werden:

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME