Utilizzo di un cluster Iceberg con Flink - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di un cluster Iceberg con Flink

A partire dalla versione 6.9.0 di Amazon EMR, puoi utilizzare Iceberg con un cluster Flink senza i passaggi di configurazione necessari quando si utilizza l'integrazione open source di Iceberg per Flink.

Creazione di un cluster Iceberg

Puoi creare un cluster con Iceberg installato utilizzando la AWS Management Console, la AWS CLI o l'API di Amazon EMR. In questo tutorial, lo utilizzerai AWS CLI per lavorare con Iceberg su un cluster Amazon EMR. Per utilizzare la console per creare un cluster con Iceberg installato, segui la procedura illustrata in Creazione di un data lake Apache Iceberg utilizzando Amazon Athena, Amazon EMR e AWS Glue.

Per utilizzare Iceberg su Amazon EMR con AWS CLI, crea innanzitutto un cluster con i seguenti passaggi. Per informazioni su come specificare la classificazione Iceberg utilizzando il AWS CLI, consulta o. Fornisci una configurazione utilizzando AWS CLI quando crei un cluster Fornitura di una configurazione utilizzando l'SDK Java per la creazione di un cluster Crea un file denominato configurations.json con i seguenti contenuti:

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

Quindi, crea un cluster con la seguente configurazione, sostituendo il percorso del bucket Amazon S3 di esempio e l'ID della sottorete con i tuoi valori:

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

In alternativa, puoi creare un cluster Amazon EMR 6.9.0 che include l'applicazione Flink e utilizza il file /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar come dipendenza JAR in un processo Flink.

Lo script SQL Client si trova in /usr/lib/flink/bin. Puoi eseguire lo script con il comando seguente:

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

Questa operazione avvia una shell (interprete di comandi) Flink SQL.

Creazione di una tabella Iceberg

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.dynamodb.DynamoDbLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

Tabella API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Scrittura su una tabella Iceberg

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

Tabella API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

API del flusso di dati

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Lettura da una tabella Iceberg

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

Tabella API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

API del flusso di dati

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Utilizzo del catalogo Hive

Assicurati che le dipendenze di Flink e Hive siano risolte come descritto in Configurazione di Flink con Hive Metastore e Catalogo Glue.

Un modo per inviare un processo a Flink consiste nell'utilizzare una sessione YARN per il processo Flink. Puoi eseguire l'avvio tramite il comando seguente:

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME