Utilizza un cluster Iceberg con Spark - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizza un cluster Iceberg con Spark

A partire da Amazon EMR versione 6.5.0, puoi utilizzare Iceberg con il cluster Spark senza dover includere operazioni di bootstrap. Per Amazon EMR versione 6.4.0 e precedenti, puoi utilizzare un'operazione di bootstrap per preinstallare tutte le dipendenze necessarie.

In questo tutorial, lo utilizzerai AWS CLI per lavorare con Iceberg su un cluster Amazon EMR Spark. Per utilizzare la console per creare un cluster con Iceberg installato, segui la procedura illustrata in Creazione di un data lake Apache Iceberg utilizzando Amazon Athena, Amazon EMR e AWS Glue.

Creazione di un cluster Iceberg

Puoi creare un cluster con Iceberg installato utilizzando AWS Management Console, the AWS CLI o l'API Amazon EMR. In questo tutorial, lo utilizzerai AWS CLI per lavorare con Iceberg su un cluster Amazon EMR. Per utilizzare la console per creare un cluster con Iceberg installato, segui la procedura illustrata in Creazione di un data lake Apache Iceberg utilizzando Amazon Athena, Amazon EMR e AWS Glue.

Per utilizzare Iceberg su Amazon EMR con AWS CLI, crea innanzitutto un cluster con i seguenti passaggi. Per informazioni su come specificare la classificazione Iceberg utilizzando il AWS CLI, consulta o. Fornisci una configurazione utilizzando AWS CLI quando crei un cluster Fornitura di una configurazione utilizzando l'SDK Java per la creazione di un cluster

  1. Crea un file configurations.json con i seguenti contenuti:

    [{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
  2. Quindi, crea un cluster con la seguente configurazione. Sostituisci il percorso del bucket Amazon S3 di esempio e l'ID della sottorete con i tuoi valori.

    aws emr create-cluster --release-label emr-6.5.0 \ --applications Name=Spark \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_Spark_Iceberg_Cluster \ --log-uri s3://DOC-EXAMPLE-BUCKET/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

In alternativa, puoi creare un cluster Amazon EMR che include l'applicazione Spark e includere il file /usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar come dipendenza JAR in un processo Spark. Per ulteriori informazioni, consulta Invio di applicazioni.

Per includere il jar come dipendenza in un processo Spark, aggiungi la seguente proprietà di configurazione all'applicazione Spark:

--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"

Per ulteriori informazioni sulle dipendenze dei processi Spark, consulta Dependency Management (Gestione delle dipendenze) nel documento di Apache Spark Running Spark on Kubernetes (Esecuzione di Spark su Kubernetes).

Inizializzazione di una sessione Spark per Iceberg

Negli esempi seguenti viene illustrato come avviare la shell interattiva Spark, utilizzare Spark submit o utilizzare Amazon EMR Notebooks per lavorare con Iceberg su Amazon EMR.

spark-shell
  1. Connessione al nodo master tramite SSH Per ulteriori informazioni, consulta Connessione al nodo master tramite SSH nella Guida alla gestione di Amazon EMR.

  2. Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, spark-shell sostituiscila con. pyspark

    spark-shell \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
spark-submit
  1. Connessione al nodo master tramite SSH Per ulteriori informazioni, consulta Connessione al nodo master tramite SSH nella Guida alla gestione di Amazon EMR.

  2. Immettere il seguente comando per avviare la sessione Spark per Iceberg.

    spark-submit \ --conf "spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" \ --conf "spark.sql.catalog.dev=org.apache.iceberg.spark.SparkCatalog" \ --conf "spark.sql.catalog.dev.type=hadoop" \ --conf "spark.sql.catalog.dev.warehouse=s3://DOC-EXAMPLE-BUCKET/example-prefix/"
EMR Studio notebooks

Per inizializzare una sessione Spark utilizzando i notebook EMR Studio, configurare la sessione Spark utilizzando il comando magico %%configure sul tuo notebook Amazon EMR, come nell'esempio seguente. Per ulteriori informazioni, consulta Utilizzo di EMR Notebooks magics nella Guida alla gestione di Amazon EMR.

%%configure -f { "conf":{ "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions", "spark.sql.catalog.dev":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.dev.type":"hadoop", "spark.sql.catalog.dev.warehouse":"s3://DOC-EXAMPLE-BUCKET/example-prefix/" } }

Scrittura su una tabella Iceberg

L'esempio seguente mostra come creare DataFrame e scrivere un set di dati Iceberg. In questa sezione, gli esempi illustrano l'utilizzo di set di dati con la Spark shell connessi al nodo master utilizzando SSH come utente hadoop predefinito.

Nota

Per incollare gli esempi di codice nella shell Spark, digitare :paste al prompt, incollare l'esempio e premere CTRL+D.

PySpark

Spark include anche una shell basata su Python, pyspark, che puoi utilizzare per realizzare prototipi di programmi Spark scritti in Python. Richiama pyspark sul nodo principale.

## Create a DataFrame. data = spark.createDataFrame([ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ],["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame. val data = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()

Lettura da una tabella Iceberg

PySpark
df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Scala
val df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Spark SQL
SELECT * from dev.db.iceberg_table LIMIT 10

Configura le proprietà Spark per utilizzare il AWS Glue Data Catalog come metastore delle tabelle Iceberg

Per utilizzare AWS Glue Catalog come Metastore per le tabelle Iceberg, imposta le proprietà di configurazione Spark come segue:

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://<bucket>/<prefix> \ --conf spark.sql.catalog.my_catalog.catalog-impl=org.apache.iceberg.aws.glue.GlueCatalog \ --conf spark.sql.catalog.my_catalog.io-impl=org.apache.iceberg.aws.s3.S3FileIO \ --conf spark.sql.catalog.my_catalog.lock-impl=org.apache.iceberg.aws.dynamodb.DynamoDbLockManager \ --conf spark.sql.catalog.my_catalog.lock.table=myGlueLockTable