Spark で Iceberg クラスターを使用する - Amazon EMR

Spark で Iceberg クラスターを使用する

Amazon EMR バージョン 6.5.0 以降では、ブートストラップアクションを追加しなくても Spark クラスターで Iceberg を使用できます。Amazon EMR バージョン 6.4.0 以前の場合、ブートストラップアクションを使用して必要なすべての依存関係を事前インストールできます。

このチュートリアルでは、AWS CLI を使用して Amazon EMR Spark クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue」の手順に従ってください。

Iceberg クラスターの作成

Iceberg がインストールされたクラスターは、AWS Management Console、AWS CLI または Amazon EMR API を使用して作成できます。このチュートリアルでは、AWS CLI を使用して Amazon EMR クラスターで Iceberg を操作します。コンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Build an Apache Iceberg data lake using Amazon Athena, Amazon EMR, and AWS Glue」の手順に従ってください。

AWS CLI で Amazon EMR の Iceberg を使用するには、まず以下の手順でクラスターを作成します。AWS CLI を使用して Iceberg 分類を指定する方法については、「クラスター作成時に AWS CLI を使用して設定を指定する」または「クラスター作成時に Java SDK を使用して設定を指定する」を参照してください。

  1. 以下のコンテンツを含む configurations.json ファイルを作成します。

    [{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
  2. 次に、以下の設定でクラスターを作成します。この例の Amazon S3 バケットパスとサブネット ID は、実際の値に置き換えてください。

    aws emr create-cluster --release-label emr-6.5.0 \ --applications Name=Spark \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_Spark_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

または、Spark アプリケーションを含む Amazon EMR クラスターを作成し、/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar ファイルを Spark ジョブの JAR 依存関係として追加することもできます。詳細については、「Submitting Applications」を参照してください。

この jar を Spark ジョブの依存関係として含めるには、以下の設定プロパティを Spark アプリケーションに追加します。

--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"

Spark ジョブの依存関係の詳細については、Apache Spark ドキュメント「Running Spark on Kubernetes」の「Dependency Management」を参照してください。

Iceberg の Spark セッションを初期化する

以下の例では、インタラクティブな Spark シェルを起動し、Spark submit を使用するか、Amazon EMR Notebooks を使用して、Amazon EMR で Iceberg を操作する方法を示します。

spark-shell
  1. SSH を使用してマスターノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してマスターノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Spark シェルを起動します。PySpark シェルを使用するには、spark-shellpyspark に置き換えます。

    spark-shell \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
spark-submit
  1. SSH を使用してマスターノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してマスターノードに接続する」を参照してください。

  2. 以下のコマンドを入力して、Iceberg の Spark セッションを起動します。

    spark-submit \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
EMR Studio notebooks

EMR Studio ノートブックを使用して Spark セッションを初期化するには、次の例のように、Amazon EMR Notebooks で %%configure マジックコマンドを使用して Spark セッションを設定します。詳細については、「Amazon EMR 管理ガイド」の「EMR Notebooks マジックを使用する」を参照してください。

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.type":"hadoop", "spark.sql.catalog.dev.warehouse":"s3://DOC-EXAMPLE-BUCKET/example-prefix/" } }

Iceberg テーブルへの書き込み

以下の例では、DataFrame を作成し、それを Iceberg データセットとして書き込む方法を示します。この例では、デフォルトの Hadoop ユーザーとして SSH を使用してマスターノードに接続しながら、Spark シェルを使用してデータセットを操作する方法を示しています。

注記

コードサンプルを Spark シェルに貼り付けるには、プロンプトで「:paste」と入力し、例を貼り付けて、[CTRL+D] を押します。

PySpark

Spark には、Python ベースのシェルである pyspark 用意されており、Python で記述された Spark プログラムのプロトタイプを作成するために使用できます。マスターノードで pyspark を起動します。

## Create a DataFrame. data = spark.createDataFrame([ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ],["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame. val data = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()

Iceberg テーブルからの読み込み

PySpark
df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Scala
val df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Spark SQL
SELECT * from dev.db.iceberg_table LIMIT 10

AWS Glue Data Catalog を Iceberg テーブルのメタストアとして使用するように Spark プロパティを設定します

AWS Glue Catalog を Iceberg テーブルのメタストアとして使用するには、Spark 設定プロパティを以下のように設定します。

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/<prefix> \ --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager \ --conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable