翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Iceberg クラスターを Spark で使用する
Amazon EMR バージョン 6.5.0 以降では、ブートストラップアクションを含める必要なく、Spark クラスターで Iceberg を使用できます。Amazon EMR バージョン 6.4.0 以前では、ブートストラップアクションを使用して必要なすべての依存関係を事前にインストールすることができます。
このチュートリアルでは、を使用して Amazon EMR Spark クラスターで Iceberg を操作します。AWS CLIコンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Amazon Athena、Amazon EMR、AWS Glue を使用して Apache アイスバーグデータレイクを構築する」の手順に従ってください。
アイスバーグクラスターの作成
Iceberg がインストールされたクラスターはAWS Management Console、、、、、、Amazon EMR API を使用して作成できます。AWS CLIこのチュートリアルでは、を使用して Amazon EMR クラスターで Iceberg を操作します。AWS CLIコンソールを使用して Iceberg がインストールされたクラスターを作成するには、「Amazon Athena、Amazon EMR、AWS Glue を使用して Apache アイスバーグデータレイクを構築する」の手順に従ってください。
Iceberg on Amazon EMR を使用してを使用するにはAWS CLI、まず以下の手順でクラスターを作成します。を使用してアイスバーグ分類を指定する方法についてはAWS CLI、クラスター作成時に AWS CLI を使用して設定を指定するまたはを参照してくださいクラスター作成時に Java SDK を使用して設定を指定する。
-
configurations.json
以下の内容でファイルを作成します。
[{
"Classification":"iceberg-defaults",
"Properties":{"iceberg.enabled":"true"}
}]
-
次に、以下の構成でクラスターを作成します。サンプルの Amazon S3 バケットパスとサブネット ID を自分のものに置き換えます。
aws emr create-cluster --release-label emr-6.5.0 \
--applications Name=Spark \
--configurations file://iceberg_configurations.json \
--region us-east-1 \
--name My_Spark_Iceberg_Cluster \
--log-uri s3://DOC-EXAMPLE-BUCKET/
\
--instance-type m5.xlarge \
--instance-count 2 \
--service-role EMR_DefaultRole_V2 \
--ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0
または、Spark アプリケーションを含む Amazon EMR クラスターを作成し、/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar
そのファイルを JAR 依存関係として Spark ジョブに含めることもできます。詳細については、「申請書の提出」を参照してください。
jar を Spark ジョブの依存関係として含めるには、以下の設定プロパティを Spark アプリケーションに追加します。
--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"
Spark ジョブの依存関係について詳しくは、Apache Spark のドキュメント「Kubernetes での Spark の実行」の「依存関係の管理」を参照してください。
Iceberg の Spark セッションを初期化する
以下の例では、インタラクティブな Spark シェルを起動し、Spark サブミットを使用するか、Amazon EMR Notee Data Catalog を使用して、Amazon EMR で Ie Data Catalog を Amazon EMR で Ie EMR で操作する方法を示しています。
- spark-shell
-
-
SSH を使用してマスターノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してマスターノードに接続する」を参照してください。
-
以下のコマンドを入力して、Spark シェルを起動します。 PySpark シェルを使用するには、spark-shell
に置き換えてくださいpyspark
。
spark-shell \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.dev.type=hadoop" \
--conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET
/example-prefix
/"
- spark-submit
-
-
SSH を使用してマスターノードに接続します。詳細については、「Amazon EMR 管理ガイド」の「SSH を使用してマスターノードに接続する」を参照してください。
-
Iceberg の Spark セッションを起動するには、以下のコマンドを入力します。
spark-submit \
--conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \
--conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \
--conf "spark.sql.catalog.dev.type=hadoop" \
--conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET
/example-prefix
/"
- EMR Studio notebooks
-
EMR Stue Data Catalog を Spark セッションに設定するには、次の例のように、Amazon EMR ノートブックで%%configure
マジックコマンドを使用して Spark セッションを設定します。詳細については、Amazon EMR 管理ガイドの「EMR ノートブックの魔法を使う」を参照してください。
%%configure -f
{
"conf":{
"spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.dev.type":"hadoop",
"spark.sql.catalog.dev.warehouse":"s3://DOC-EXAMPLE-BUCKET
/example-prefix
/"
}
}
Iceberg テーブルに書き込み
次の例は、を作成し、Iceberg データセットとして記述する方法を示しています。 DataFrame これらの例は、デフォルトの hadoop ユーザーとして SSH を使用してマスターノードに接続し、Spark シェルを使用してデータセットを操作する方法を示しています。
コードサンプルを Spark シェルに貼り付けるには、:paste
プロンプトに入力し、例を貼り付けてから、を押しますCTRL+D
。
- PySpark
-
Spark には Python ベースのシェルが含まれておりpyspark
、これを使用して Python で記述された Spark プログラムのプロトタイプを作成できます。pyspark
マスターノードで呼び出します。
## Create a DataFrame.
data = spark.createDataFrame([
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
],["id", "creation_date", "last_update_time"])
## Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://DOC-EXAMPLE-BUCKET
/example-prefix
/db/iceberg_table'""")
data.writeTo("dev.db.iceberg_table").append()
- Scala
-
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.functions._
// Create a DataFrame.
val data = Seq(
("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"),
("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"),
("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"),
("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")
).toDF("id", "creation_date", "last_update_time")
// Write a DataFrame as a Iceberg dataset to the Amazon S3 location.
spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string,
creation_date string,
last_update_time string)
USING iceberg
location 's3://DOC-EXAMPLE-BUCKET
/example-prefix
/db/iceberg_table'""")
data.writeTo("dev.db.iceberg_table").append()
Iceberg テーブルから読み込み
- PySpark
-
df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
- Scala
-
val df = spark.read.format("iceberg").load("dev.db.iceberg_table")
df.show()
- Spark SQL
-
SELECT * from dev.db.iceberg_table LIMIT 10
AWSGlue データカタログを Iceberg テーブルのメタストアとして使用するように Spark プロパティを設定します。
AWSGlue Catalog を Iceberg テーブルのメタストアとして使用するには、Spark 設定プロパティを以下のように設定します。
spark-submit \
--conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/<prefix> \
--conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \
--conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \
--conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager \
--conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable