Flink での Iceberg クラスターの使用 - Amazon EMR

Flink での Iceberg クラスターの使用

Amazon EMR バージョン 6.9.0 以降では、オープンソースの Iceberg Flink 統合を使用する際に必要なセットアップ手順なしに Flink クラスターで Iceberg を使用できます。

Iceberg クラスターの作成

Iceberg がインストールされたクラスターは、AWS Management Console、AWS CLI、または Amazon EMR API を使用して作成できます。このチュートリアルでは、AWS CLI を使用して Amazon EMR クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue」の手順に従ってください。

AWS CLI で Amazon EMR の Iceberg を使用するには、まず以下の手順でクラスターを作成します。AWS CLI を使用して Iceberg 分類を指定する方法については、「クラスター作成時に AWS CLI を使用して設定を指定する」または「クラスター作成時に Java SDK を使用して設定を指定する」を参照してください。configurations.json というファイルを次の内容で作成します。

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

次に、以下の設定でクラスターを作成し、この例の Amazon S3 バケットパスとサブネット ID を独自の値に置き換えます。

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

または、Flink アプリケーションを含む Amazon EMR 6.9.0 クラスターを作成し、/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar ファイルを Flink ジョブの JAR 依存関係として使用することもできます。

SQL クライアントスクリプトは /usr/lib/flink/bin にあります。次のコマンドを使用してスクリプトを実行します。

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

これにより Flink SQL シェルが起動します。

Iceberg テーブルの作成

Flink SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

Table API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Iceberg テーブルへの書き込み

Flink SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

Table API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

Datastream API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Iceberg テーブルからの読み込み

Flink SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

Table API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

Datastream API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Hive カタログの使用

Flink と Hive の依存関係が「Hive Metastore と Glue Catalog を使用して Flink を設定する」の説明に従って解決されていることを確認します。

Flink にジョブを送信する方法の 1 つは、ジョブ単位の Flink YARN セッションを使用することです。これを実行するには、次のコマンドを使用します。

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME