翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Iceberg クラスターを Flink で使用する
Amazon EMR バージョン 6.9.0 以降では、オープンソースの Iceberg Flink インテグレーションを使用するときに必要なセットアップ手順なしで、Flink クラスターで Iceberg を使用できます。
アイスバーグクラスターの作成
Iceberg がインストールされたクラスターはAWS Management Console、、、、AWS CLI、または Amazon EMR API を使用して作成できます。このチュートリアルでは、を使用して Amazon EMR クラスターで Iceberg を操作します。AWS CLIコンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Amazon Athena、Amazon EMR、AWS Glue を使用して Apache アイスバーグデータレイクを構築する
Iceberg on Amazon EMR を使用してを使用するにはAWS CLI、まず以下の手順でクラスターを作成します。を使用してアイスバーグ分類を指定する方法についてはAWS CLI、クラスター作成時に AWS CLI を使用して設定を指定するまたはを参照してくださいクラスター作成時に Java SDK を使用して設定を指定する。configurations.json
以下の内容でというファイルを作成します。
[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
次に、以下の設定でクラスターを作成し、例の Amazon S3 バケットパスとサブネット ID を独自の値に置き換えます。
aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef
または、Flink アプリケーションを含む Amazon EMR 6.9.0 クラスターを作成し、/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jar
そのファイルを Flink ジョブの JAR 依存関係として使用することもできます。
Flink SQL クライアントを使用する
SQL クライアントスクリプトは以下にあります/usr/lib/flink/bin
。スクリプトは、以下のコマンドで実行できます。
flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh
これにより、Flink SQL シェルが起動します。
Flink サンプル
Iceberg テーブルを作成する
フリンク SQL
CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);
テーブル API
EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");
Iceberg テーブルに書き込み
フリンク SQL
INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');
テーブル API
tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");
データストリーム API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");
Iceberg テーブルから読み込み
フリンク SQL
SELECT * FROM `glue_catalog`.`<DB>`.`sample`;
テーブル API
Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");
データストリーム API
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");
Hive カタログを使用する
Flink と Hive の依存関係がの説明どおりに解決されていることを確認してくださいAmazon EMR の Hive メタストアへの Flink の設定。
Flink Job 実行
Flink にジョブを送信する 1 つの方法は、ジョブごとの Flink YARN セッションを使用することです。これは以下のコマンドで起動できます。
sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME