Iceberg クラスターを Flink で使用する - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Iceberg クラスターを Flink で使用する

Amazon EMR バージョン 6.9.0 以降では、オープンソースの Iceberg Flink インテグレーションを使用するときに必要なセットアップ手順なしで、Flink クラスターで Iceberg を使用できます。

アイスバーグクラスターの作成

Iceberg がインストールされたクラスターはAWS Management Console、、、、AWS CLI、または Amazon EMR API を使用して作成できます。このチュートリアルでは、を使用して Amazon EMR クラスターで Iceberg を操作します。AWS CLIコンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Amazon Athena、Amazon EMR、AWS Glue を使用して Apache アイスバーグデータレイクを構築する」の手順に従ってください。

Iceberg on Amazon EMR を使用してを使用するにはAWS CLI、まず以下の手順でクラスターを作成します。を使用してアイスバーグ分類を指定する方法についてはAWS CLI、クラスター作成時に AWS CLI を使用して設定を指定するまたはを参照してくださいクラスター作成時に Java SDK を使用して設定を指定するconfigurations.json以下の内容でというファイルを作成します。

[{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]

次に、以下の設定でクラスターを作成し、例の Amazon S3 バケットパスとサブネット ID を独自の値に置き換えます。

aws emr create-cluster --release-label emr-6.9.0 \ --applications Name=Flink \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_flink_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef

または、Flink アプリケーションを含む Amazon EMR 6.9.0 クラスターを作成し、/usr/share/aws/iceberg/lib/iceberg-flink-runtime.jarそのファイルを Flink ジョブの JAR 依存関係として使用することもできます。

SQL クライアントスクリプトは以下にあります/usr/lib/flink/bin。スクリプトは、以下のコマンドで実行できます。

flink-yarn-session -d # starting the Flink YARN Session in detached mode ./sql-client.sh

これにより、Flink SQL シェルが起動します。

Iceberg テーブルを作成する

フリンク SQL

CREATE CATALOG glue_catalog WITH ( 'type'='iceberg', 'warehouse'='<WAREHOUSE>', 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog', 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO', 'lock-impl'='org.apache.iceberg.aws.glue.DynamoLockManager', 'lock.table'='myGlueLockTable' ); USE CATALOG glue_catalog; CREATE DATABASE IF NOT EXISTS <DB>; USE <DB>; CREATE TABLE IF NOT EXISTS `glue_catalog`.`<DB>`.`sample` (id int, data string);

テーブル API

EnvironmentSettings settings = EnvironmentSettings.newInstance().inBatchMode().build(); TableEnvironment tEnv = TableEnvironment.create(settings); String warehouse = "<WAREHOUSE>"; String db = "<DB>"; tEnv.executeSql( "CREATE CATALOG glue_catalog WITH (\n" + " 'type'='iceberg',\n" + " 'warehouse'='" + warehouse + "',\n" + " 'catalog-impl'='org.apache.iceberg.aws.glue.GlueCatalog',\n" + " 'io-impl'='org.apache.iceberg.aws.s3.S3FileIO'\n" + " );"); tEnv.executeSql("USE CATALOG glue_catalog;"); tEnv.executeSql("CREATE DATABASE IF NOT EXISTS " + db + ";"); tEnv.executeSql("USE " + db + ";"); tEnv.executeSql( "CREATE TABLE `glue_catalog`.`" + db + "`.`sample` (id bigint, data string);");

Iceberg テーブルに書き込み

フリンク SQL

INSERT INTO `glue_catalog`.`<DB>`.`sample` values (1, 'a'),(2,'b'),(3,'c');

テーブル API

tEnv.executeSql( "INSERT INTO `glue_catalog`.`" + db + "`.`sample` values (1, 'a'),(2,'b'),(3,'c');");

データストリーム API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; GenericRowData rowData1 = new GenericRowData(2); rowData1.setField(0, 1L); rowData1.setField(1, StringData.fromString("a")); DataStream<RowData> input = env.fromElements(rowData1); Map<String, String> props = new HashMap<(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStreamSink<Void> dataStreamSink = FlinkSink.forRowData(input).tableLoader(tableLoader).append(); env.execute("Datastream Write");

Iceberg テーブルから読み込み

フリンク SQL

SELECT * FROM `glue_catalog`.`<DB>`.`sample`;

テーブル API

Table result = tEnv.sqlQuery("select * from `glue_catalog`.`" + db + "`.`sample`;");

データストリーム API

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); String db = "<DB Name>"; String warehouse = "<Warehouse Path>"; Map<String, String> props = new HashMap<>(); props.put("type", "iceberg"); props.put("warehouse", warehouse); props.put("io-impl", "org.apache.iceberg.aws.s3.S3FileIO"); CatalogLoader glueCatlogLoader = CatalogLoader.custom( "glue", props, new Configuration(), "org.apache.iceberg.aws.glue.GlueCatalog"); TableLoader tableLoader = TableLoader.fromCatalog(glueCatlogLoader, TableIdentifier.of(db, "sample")); DataStream<RowData> batch = FlinkSource.forRowData().env(env).tableLoader(tableLoader).streaming(false).build(); batch.print().name("print-sink");

Hive カタログを使用する

Flink と Hive の依存関係がの説明どおりに解決されていることを確認してくださいAmazon EMR の Hive メタストアへの Flink の設定

Flink にジョブを送信する 1 つの方法は、ジョブごとの Flink YARN セッションを使用することです。これは以下のコマンドで起動できます。

sudo flink run -m yarn-cluster -p 4 -yjm 1024m -ytm 4096m $JAR_FILE_NAME