Flink での Iceberg クラスターの使用
Amazon EMR バージョン 6.9.0 以降では、オープンソースの Iceberg Flink 統合を使用する際に必要なセットアップ手順なしに Flink クラスターで Iceberg を使用できます。
Iceberg クラスターの作成
Iceberg がインストールされたクラスターは、AWS Management Console、AWS CLI、または Amazon EMR API を使用して作成できます。このチュートリアルでは、AWS CLI を使用して Amazon EMR クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue
AWS CLI で Amazon EMR の Iceberg を使用するには、まず以下の手順でクラスターを作成します。AWS CLI を使用して Iceberg 分類を指定する方法については、「クラスター作成時に AWS CLI を使用して設定を指定する」または「クラスター作成時に Java SDK を使用して設定を指定する」を参照してください。configurations.json
というファイルを次の内容で作成します。
[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
次に、以下の設定でクラスターを作成し、この例の Amazon S3 バケットパスとサブネット ID を独自の値に置き換えます。
aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
または、Flink アプリケーションを含む Amazon EMR 6.9.0 クラスターを作成し、/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
ファイルを Flink ジョブの JAR 依存関係として使用することもできます。
Flink SQL クライアントの使用
SQL クライアントスクリプトは /usr/lib/flink/bin
にあります。次のコマンドを使用してスクリプトを実行します。
flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh
これにより Flink SQL シェルが起動します。
Flink の例
Iceberg テーブルの作成
Flink SQL
CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
Table API
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Iceberg テーブルへの書き込み
Flink SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
Table API
tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
Datastream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");
Iceberg テーブルからの読み込み
Flink SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
Table API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
Datastream API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");
Hive カタログの使用
Flink と Hive の依存関係が「Hive Metastore と Glue Catalog を使用して Flink を設定する」の説明に従って解決されていることを確認します。
Flink ジョブの実行
Flink にジョブを送信する方法の 1 つは、ジョブ単位の Flink YARN セッションを使用することです。これを実行するには、次のコマンドを使用します。
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME