As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Usar um cluster do Iceberg com o Flink
Desde a versão 6.9.0 do Amazon EMR, você pode usar o Iceberg com um cluster do Flink sem as etapas de configuração necessárias ao usar a integração Iceberg-Flink de código aberto.
Criar um cluster no Iceberg
É possível criar um cluster com o Iceberg instalado usando o AWS Management Console, a AWS CLI ou a API do Amazon EMR. Neste tutorial, você usa o AWS CLI para trabalhar com o Iceberg em um cluster do Amazon EMR. Para usar o console para criar um cluster com o Iceberg instalado, siga as etapas em Criar um data lake no Apache Iceberg usando o Amazon Athena, o Amazon EMR e o AWS Glue
Para usar o Iceberg no Amazon EMR com AWS CLI o, primeiro crie um cluster com as etapas a seguir. Para obter informações sobre como especificar a classificação do Iceberg usando o AWS CLI, consulte Forneça uma configuração usando o AWS CLI ao criar um cluster ou. Fornecer uma configuração usando o SDK do Java ao criar um cluster Crie um arquivo denominado configurations.json
com o seguinte conteúdo:
[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
Em seguida, crie um cluster com a configuração a seguir, substituindo o exemplo de caminho do bucket do Amazon S3 e o ID da sub-rede pelos seus próprios valores:
aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
Como alternativa, é possível criar um cluster do Amazon EMR 6.9.0 contendo uma aplicação do Flink e usar o arquivo /usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
como uma dependência do JAR em um trabalho do Flink.
Usar o clinte SQL no Flink
O script do cliente SQL está localizado em /usr/lib/flink/bin
. Você pode executar o script com o seguinte comando:
flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh
Isso inicia um shell SQL no Flink.
Exemplos do Flink
Criar uma tabela no Iceberg
SQL no Flink
CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
API de tabela
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Gravar em uma tabela do Iceberg
SQL no Flink
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
API de tabela
tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
API de fluxo de dados
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");
Ler em uma tabela do Iceberg
SQL no Flink
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
API de tabela
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
API de fluxo de dados
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");
Usar o catálogo do Hive
Certifique-se de que as dependências do Flink e do Hive sejam resolvidas conforme descrito em Configurar o Flink com o Hive Metastore e o Catálogo do Glue.
Executar um trabalho do Flink
Uma forma de enviar um trabalho ao Flink é usar uma sessão do YARN do Flink por trabalho. Isso pode ser iniciado com o seguinte comando:
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME
Considerações sobre o uso do Iceberg com o Flink
-
Ao usar o AWS Glue como um catálogo para o Iceberg, certifique-se de que o banco de dados no qual você está criando uma tabela exista no AWS Glue. Se você estiver usando serviços como AWS Lake Formation e não conseguir carregar o catálogo, verifique se você tem acesso adequado ao serviço para executar o comando.
A integração com o Iceberg Glue não funciona com o catálogo de armazenamento gerenciado do Redshift.