Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verwenden Sie einen Iceberg-Cluster mit Flink
Ab Amazon-EMR-Version 6.9.0 können Sie Iceberg mit einem Flink-Cluster verwenden, ohne dass die Einrichtungsschritte erforderlich sind, die bei der Verwendung der Open-Source-Iceberg-Flink-Integration erforderlich sind.
Erstellen Sie einen Iceberg-Cluster
Sie können einen Cluster mit installiertem Iceberg mithilfe der AWS Management Console, der AWS CLI oder der Amazon-EMR-API erstellen. In diesem Tutorial verwenden Sie den, AWS CLI um mit Iceberg auf einem Amazon EMR-Cluster zu arbeiten. Um die Konsole zur Erstellung eines Clusters mit Iceberg zu verwenden, folgen Sie den Schritten unter Ein Data Lake von Apache Iceberg mit Amazon Athena, Amazon EMR und AWS Glue erstellen
Um Iceberg auf Amazon EMR mit dem zu verwenden AWS CLI, erstellen Sie zunächst einen Cluster mit den folgenden Schritten. Informationen zur Spezifizierung der Iceberg-Klassifizierung mithilfe von finden Sie unter oder AWS CLI. Geben Sie AWS CLI beim Erstellen eines Clusters eine Konfiguration an, indem Sie Beim Erstellen eines Clusters eine Konfiguration mit dem Java SDK angeben Erstellen Sie eine Datei mit dem Namen configurations.json
und dem folgenden Inhalt:
[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
Erstellen Sie als Nächstes einen Cluster mit der folgenden Konfiguration und ersetzen Sie den Amazon-S3-Bucket-Beispielpfad und die Subnetz-ID durch Ihre eigenen Werte:
aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
Alternativ können Sie einen Amazon-EMR-6.9.0-Cluster mit einer Flink-Anwendung darin erstellen und die Datei /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
als JAR-Abhängigkeit in einem Flink-Auftrag verwenden.
Verwenden des Flink-SQL-Clients
Das SQL-Client-Skript befindet sich unter /usr/lib/flink/bin
. Sie können das Skript mit dem folgenden Befehl ausführen:
flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh
Dadurch wird eine Flink-SQL-Shell gestartet.
Flink-Beispiele
Erstellen Sie eine Iceberg-Tabelle
Flink SQL
CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
Tabellen-API
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Schreiben Sie an einen Iceberg-Tabelle
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
Tabellen-API
tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
Datenstream-API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");
Lesen von einer Iceberg-Tabelle
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
Tabellen-API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
Datenstream-API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");
Verwenden des Hive-Katalogs
Stellen Sie sicher, dass die Abhängigkeiten zwischen Flink und Hive wie unter Konfigurieren Sie Flink mit Hive Metastore und Glue Catalog beschrieben aufgelöst wurden.
Flink-Auftrag ausführen
Eine Möglichkeit, einen Auftrag an Flink zu senden, besteht darin, eine Flink-YARN-Sitzung pro Auftrag zu verwenden. Dies kann mit dem folgenden Befehl gestartet werden:
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
Überlegungen zur Verwendung von Iceberg mit Flink
-
Wenn Sie AWS Glue als Katalog für Iceberg verwenden, stellen Sie sicher, dass die Datenbank, in der Sie eine Tabelle erstellen, in AWS Glue vorhanden ist. Wenn Sie Dienste wie verwenden AWS Lake Formation und den Katalog nicht laden können, stellen Sie sicher, dass Sie über den richtigen Zugriff auf den Dienst verfügen, um den Befehl auszuführen.
Die Iceberg Glue-Integration funktioniert nicht mit dem Redshift Managed Storage-Katalog.