Gunakan klaster dengan Iceberg - Amazon EMR

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Gunakan klaster dengan Iceberg

Gunakan cluster Iceberg dengan Spark

Dimulai dengan Amazon EMR versi 6.5.0, Anda dapat menggunakan Iceberg dengan klaster Spark Anda tanpa memerlukan tindakan bootstrap. Untuk Amazon EMR versi 6.4.0 dan yang lebih lama, Anda dapat menggunakan tindakan bootstrap untuk menginstal semua dependensi yang diperlukan sebelumnya.

Dalam tutorial ini, kita akan menggunakanAWS CLI untuk bekerja dengan Iceberg pada cluster Amazon EMR Spark. Untuk menggunakan konsol untuk membuat klaster dengan Iceberg diinstal, ikuti langkah-langkah dalam Membangun data lake Apache Iceberg menggunakan Amazon Athena, Amazon EMR, danAWS Glue.

Buat klaster

Anda dapat membuat klaster dengan Iceberg yang diinstal menggunakanAWS Management Console,AWS CLI atau API Amazon EMR. Dalam tutorial ini, kita akan menggunakanAWS CLI untuk bekerja dengan Iceberg pada cluster Amazon EMR. Untuk menggunakan konsol untuk membuat klaster dengan Iceberg diinstal, ikuti langkah-langkah dalam Membangun data lake Apache Iceberg menggunakan Amazon Athena, Amazon EMR, danAWS Glue.

Untuk menggunakan Iceberg di Amazon EMR denganAWS CLI, buat klaster dengan langkah-langkah berikut. Untuk informasi tentang menentukan klasifikasi gunung es menggunakanAWS CLI, melihatSediakan konfigurasi menggunakan AWS CLI saat Anda membuat sebuah klaster atauSediakan konfigurasi menggunakan Java SDK ketika Anda membuat sebuah klaster.

  1. Buat file, configurations.json, dengan konten berikut:

    [{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
  2. Selanjutnya, buat klaster dengan konfigurasi berikut, ganti contoh jalur bucket Amazon S3 dan subnet ID dengan milik Anda sendiri.

    aws emr create-cluster --release-label emr-6.5.0 \ --applications Name=Spark \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_Spark_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

Atau, Anda dapat membuat klaster Amazon EMR termasuk aplikasi Spark dan menyertakan file/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar sebagai dependensi JAR dalam pekerjaan Spark. Untuk informasi selengkapnya, lihat Mengirimkan Aplikasi.

Untuk menyertakan jar sebagai dependensi dalam pekerjaan Spark, Anda dapat menambahkan properti konfigurasi berikut ke aplikasi Spark:

--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"

Untuk informasi selengkapnya tentang dependensi pekerjaan Spark, lihat.

Inisialisasi sesi Spark

Contoh berikut menunjukkan cara meluncurkan shell Spark interaktif, gunakan Spark kirim, atau gunakan Amazon EMR Notebooks untuk bekerja dengan Iceberg di Amazon EMR.

spark-shell
  1. Connect ke simpul utama menggunakan SSH. Untuk informasi selengkapnya, lihat Connect ke simpul utama menggunakan SSH di Panduan Pengelolaan Amazon EMR.

  2. Masukkan perintah berikut untuk meluncurkan shell Spark. Untuk menggunakan PySpark shell, gantispark-shell denganpyspark.

    spark-shell \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
spark-submit
  1. Connect ke simpul utama menggunakan SSH. Untuk informasi selengkapnya, lihat Connect ke simpul utama menggunakan SSH di Panduan Pengelolaan Amazon EMR.

  2. Masukkan perintah berikut untuk meluncurkan sesi Spark untuk Iceberg.

    spark-submit \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
EMR Studio notebooks

Untuk menginisialisasi sesi Spark menggunakan notebook EMR Studio, konfigurasikan sesi Spark Anda menggunakan perintah%%configure ajaib di notebook Amazon EMR Anda, seperti pada contoh berikut. Untuk informasi selengkapnya, lihat Menggunakan EMR Notebooks di Panduan Pengelolaan Amazon EMR.

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.type":"hadoop", "spark.sql.catalog.dev.warehouse":"s3://DOC-EXAMPLE-BUCKET/example-prefix/" } }

Menulis ke meja gunung es

Contoh berikut menunjukkan cara membuat DataFrame dan menuliskannya sebagai set data Iceberg. Contoh menunjukkan bekerja dengan set data menggunakan shell Spark saat terhubung ke simpul utama menggunakan SSH sebagai default hadoop

catatan

Untuk menyisipkan contoh kode ke shell Spark, ketikkan:paste pada prompt, tempel contoh, dan kemudian tekanCTRL+D.

PySpark

Spark termasuk shell berbasis Python,pyspark, yang dapat Anda gunakan untuk prototipe program Spark ditulis dengan Python. Sama seperti dengan spark-shell, memohonpyspark pada node master.

## Create a DataFrame data = spark.createDataFrame([ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ],["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Iceberg dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Iceberg dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()

Baca dari meja gunung es

PySpark
df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Scala
val df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Spark SQL
SELECT * from dev.db.iceberg_table LIMIT 10

Konfigurasikan properti Spark untuk menggunakan Katalog DataAWS Glue sebagai tabel Iceberg metastore

Untuk menggunakan KatalogAWS Glue sebagai tabel Metastore for Iceberg, atur properti konfigurasi Spark seperti di bawah ini:

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/<prefix> \ --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.glue.DynamoLockManager \ --conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable

Gunakan cluster Gunung Es dengan Trino

Dimulai dengan Amazon EMR versi 6.6.0, Anda dapat menggunakan Iceberg dengan klaster Trino Anda.

Dalam tutorial ini, kita akan menggunakanAWS CLI untuk bekerja dengan Iceberg pada klaster Amazon EMR Trino. Untuk menggunakan konsol untuk membuat klaster dengan Iceberg diinstal, ikuti langkah-langkah dalam Membangun data lake Apache Iceberg menggunakan Amazon Athena, Amazon EMR, danAWS Glue.

Buat klaster

Untuk menggunakan Iceberg di Amazon EMR denganAWS CLI, buat klaster dengan langkah-langkah berikut. Untuk informasi tentang menentukan klasifikasi gunung es menggunakanAWS CLI, melihatSediakan konfigurasi menggunakan AWS CLI saat Anda membuat sebuah klaster atauSediakan konfigurasi menggunakan Java SDK ketika Anda membuat sebuah klaster.

  1. Buat file,iceberg.properties, dan tetapkan nilai untuk katalog pilihan Anda. Misalnya, jika Anda ingin menggunakan Hive meastore sebagai katalog Anda, file Anda harus memiliki konten berikut.

    connector.name=iceberg hive.metastore.uri=thrift://localhost:9083

    Jika Anda ingin menggunakan Katalog DataAWS Glue sebagai toko Anda, file Anda harus memiliki konten berikut.

    connector.name=iceberg iceberg.catalog.type=glue
  2. Buat tindakan bootstrap yangiceberg.properties disalin dari Amazon S3/etc/trino/conf/catalog/iceberg.properties, seperti pada contoh berikut. Untuk informasi tentang tindakan bootstrap, lihat Buat tindakan bootstrap untuk menginstal perangkat lunak tambahan.

    set -ex sudo aws s3 cp s3://DOC-EXAMPLE-BUCKET/iceberg.properties /etc/trino/conf/catalog/iceberg.properties
  3. Buat cluster dengan konfigurasi berikut, ganti contoh bootstrap action script path dan nama kunci dengan nama Anda sendiri.

    aws emr create-cluster --release-label emr-6.7.0 \ --applications Name=Trino \ --region us-east-1 \ --name My_Trino_Iceberg_Cluster \ --bootstrap-actions '[{"Path":"s3://DOC-EXAMPLE-BUCKET","Name":"Add iceberg.properties"}]' \ --instance-groups InstanceGroupType=MASTER,InstanceCount=1,InstanceType=c3.4xlarge InstanceGroupType=CORE,InstanceCount=3,InstanceType=c3.4xlarge \ --use-default-roles \ --ec2-attributes KeyName=<key-name>

Inisialisasi sesi Trino untuk Iceberg

Untuk menginisialisasi sesi Trino, jalankan perintah berikut.

trino-cli --catalog iceberg

Menulis ke meja gunung es

Buat dan tulis ke meja Anda dengan perintah SQL berikut.

trino> SHOW SCHEMAS; trino> CREATE TABLE default.iceberg_table ( id int, data varchar, category varchar) WITH ( format = 'PARQUET', partitioning = ARRAY['category', 'bucket(id, 16)'], location = 's3://DOC-EXAMPLE-BUCKET/<prefix>') trino> INSERT INTO default.iceberg_table VALUES (1,'a','c1'), (2,'b','c2'), (3,'c','c3');

Baca dari meja untuk gunung es

Untuk membaca dari tabel Iceberg Anda, jalankan perintah berikut.

trino> SELECT * from default.iceberg_table;