Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Utilizzo di un cluster Iceberg con Flink
A partire dalla versione 6.9.0 di Amazon EMR, puoi utilizzare Iceberg con un cluster Flink senza i passaggi di configurazione necessari quando si utilizza l'integrazione open source di Iceberg per Flink.
Creazione di un cluster Iceberg
Puoi creare un cluster con Iceberg installato utilizzando la AWS Management Console, la AWS CLI o l'API di Amazon EMR. In questo tutorial, lo utilizzerai AWS CLI per lavorare con Iceberg su un cluster Amazon EMR. Per utilizzare la console per creare un cluster con Iceberg installato, segui la procedura illustrata in Creazione di un data lake Apache Iceberg utilizzando Amazon Athena, Amazon EMR e AWS Glue
Per utilizzare Iceberg su Amazon EMR con AWS CLI, crea innanzitutto un cluster con i seguenti passaggi. Per informazioni su come specificare la classificazione Iceberg utilizzando il AWS CLI, consulta o. Fornisci una configurazione utilizzando AWS CLI quando crei un cluster Fornitura di una configurazione utilizzando l'SDK Java per la creazione di un cluster Crea un file denominato configurations.json
con i seguenti contenuti:
[{
"Classification":"iceberg-defaults",
"Properties":{"iceberg.enabled":"true"}
}]
Quindi, crea un cluster con la seguente configurazione, sostituendo il percorso del bucket Amazon S3 di esempio e l'ID della sottorete con i tuoi valori:
aws emr create-cluster --release-label emr-6.9.0 \
--applications Name=Flink \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_flink_Iceberg_Cluster \
--log-uri s3://amzn-s3-demo-bucket/ \
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
In alternativa, puoi creare un cluster Amazon EMR 6.9.0 che include l'applicazione Flink e utilizza il file /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
come dipendenza JAR in un processo Flink.
Utilizzo del client Flink SQL
Lo script SQL Client si trova in /usr/lib/flink/bin
. Puoi eseguire lo script con il comando seguente:
flink-yarn-session -d # starting the Flink YARN Session in detached mode
./sql-client.sh
Questa operazione avvia una shell (interprete di comandi) Flink SQL.
Esempi di Flink
Creazione di una tabella Iceberg
Flink SQL
CREATE CATALOG glue_catalog WITH (
'type'='iceberg',
'warehouse'='<WAREHOUSE>',
'catalog-type'='glue'
);
USE CATALOG glue_catalog;
CREATE DATABASE IF NOT EXISTS <DB>;
USE <DB>;
CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
Tabella API
EnvironmentSettings settings =
EnvironmentSettings.newInstance().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
String warehouse = "<WAREHOUSE>";
String db = "<DB>";
tEnv.executeSql(
"CREATE CATALOG glue_catalog WITH (\n"
+ " 'type'='iceberg',\n"
+ " 'warehouse'='"
+ warehouse
+ "',\n"
+ " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n"
+ " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n"
+ " );");
tEnv.executeSql("USE CATALOG glue_catalog;");
tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";");
tEnv.executeSql("USE " + db + ";");
tEnv.executeSql(
"CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Scrittura su una tabella Iceberg
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
Tabella API
tEnv.executeSql(
"INSERT INTO `glue_catalog`.`"
+ db
+ "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
API del flusso di dati
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
GenericRowData rowData1 = new GenericRowData(2);
rowData1.setField(0, 1L);
rowData1.setField(1, StringData.fromString("a"));
DataStream<RowData> input = env.fromElements(rowData1);
Map<String, String> props = new HashMap<();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStreamSink<Void> dataStreamSink =
FlinkSink.forRowData(input).tableLoader(tableLoader).append();
env.execute("Datastream Write");
Lettura da una tabella Iceberg
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
Tabella API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
API del flusso di dati
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
String db = "<DB Name>";
String warehouse = "<Warehouse Path>";
Map<String, String> props = new HashMap<>();
props.put("type", "iceberg");
props.put("warehouse", warehouse);
props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO");
CatalogLoader glueCatlogLoader =
CatalogLoader.custom(
"glue",
props,
new Configuration(),
"org.apache.iceberg.aws.glue.GlueCatalog");
TableLoader tableLoader =
TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample"));
DataStream<RowData> batch =
FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build();
batch.print().name("print-sink");
Utilizzo del catalogo Hive
Assicurati che le dipendenze di Flink e Hive siano risolte come descritto in Configurazione di Flink con Hive Metastore e Catalogo Glue.
Esecuzione di un processo Flink
Un modo per inviare un processo a Flink consiste nell'utilizzare una sessione YARN per il processo Flink. Puoi eseguire l'avvio tramite il comando seguente:
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
Considerazioni sull'utilizzo di Iceberg con Flink
-
Quando usi AWS Glue come catalogo per Iceberg, assicurati che il database in cui stai creando una tabella esista in AWS Glue. Se utilizzi servizi come AWS Lake Formation e non riesci a caricare il catalogo, assicurati di avere accesso adeguato al servizio per eseguire il comando.
L'integrazione di Iceberg Glue non funziona con il catalogo Redshift Managed Storage.