Amazon S3 Tables Catalog for Apache Iceberg を使用した Amazon S3 テーブルへのアクセス
Amazon S3 Tables Catalog for Apache Spark クライアントカタログを使用して、Apache Iceberg のようなオープンソースクエリエンジンから S3 テーブルにアクセスできます。Amazon S3 Tables Catalog for Apache Iceberg は、AWS Labs がホストするオープンソースライブラリです。これは、クエリエンジン内の Apache Iceberg オペレーション (テーブル検出、メタデータの更新、テーブルの追加または削除など) を S3 Tables API オペレーションに変換することで機能します。
Amazon S3 Tables Catalog for Apache Iceberg は、s3-tables-catalog-for-iceberg.jar
という Maven JAR として配布されています。クライアントカタログ JAR は、AWS Labs GitHub リポジトリ
Apache Spark での Amazon S3 Tables Catalog for Apache Iceberg の使用
Spark セッションを初期化するときに、Amazon S3 Tables Catalog for Apache Iceberg クライアントカタログを使用して、オープンソースアプリケーションからテーブルに接続できます。セッション設定で Iceberg と Amazon S3 の依存関係を指定し、テーブルバケットをメタデータウェアハウスとして使用するカスタムカタログを作成します。
前提条件
テーブルバケットと S3 Tables アクションへのアクセス権を持つ IAM ID。詳細については、「S3 Tables のアクセス管理」を参照してください。
Amazon S3 Tables Catalog for Apache Iceberg を使用して Spark セッションを初期化するには
-
次のコマンドを使用して Spark を初期化します。コマンドを使用するには、Amazon S3 Tables Catalog for Apache Iceberg の
バージョン番号
を AWS Labs GitHub リポジトリの最新バージョンに置き換え、 テーブルバケット ARN
を実際のテーブルバケット ARN に置き換えます。spark-shell \ --packages org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.6.1,software.amazon.s3tables:s3-tables-catalog-for-iceberg-runtime:
0.1.4
\ --conf spark.sql.catalog.s3tablesbucket
=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.s3tablesbucket
.catalog-impl=software.amazon.s3tables.iceberg.S3TablesCatalog \ --conf spark.sql.catalog.s3tablesbucket
.warehouse=arn:aws:s3tables:us-east-1
:111122223333
:bucket/amzn-s3-demo-table-bucket
\ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
Spark SQL を使用した S3 テーブルのクエリ
Spark を使用すると、S3 テーブルで DQL、DML、DDL オペレーションを実行できます。テーブルをクエリするときは、セッションカタログ名を含む完全修飾テーブル名を次のパターンに従って使用します。
CatalogName
.NamespaceName
.TableName
次のクエリ例は、S3 テーブルを操作するいくつかの方法を示しています。クエリエンジンでこれらのサンプルクエリを使用するには、ユーザー入力プレースホルダー
値を実際の値に置き換えます。
Spark を使用してテーブルをクエリするには
-
名前空間を作成します。
spark.sql(" CREATE NAMESPACE IF NOT EXISTS
s3tablesbucket
.my_namespace
") -
テーブルの作成
spark.sql(" CREATE TABLE IF NOT EXISTS
s3tablesbucket
.my_namespace
.`my_table
` ( id INT, name STRING, value INT ) USING iceberg ") -
テーブルに対してクエリを実行する
spark.sql(" SELECT * FROM
s3tablesbucket
.my_namespace
.`my_table
` ").show() -
テーブルにデータを挿入する
spark.sql( """ INSERT INTO
s3tablesbucket
.my_namespace
.my_table
VALUES (1, 'ABC', 100), (2, 'XYZ', 200) """) -
テーブルに既存のデータファイルをロードする
データを Spark に読み取ります。
val data_file_location = "Path such as S3 URI to data file" val data_file = spark.read.parquet(
data_file_location
)データを Iceberg テーブルに書き込みます。
data_file.writeTo("
s3tablesbucket
.my_namespace
.my_table
").using("Iceberg").tableProperty ("format-version", "2").createOrReplace()