Utilizza un cluster Iceberg con Spark - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizza un cluster Iceberg con Spark

A partire da Amazon EMR versione 6.5.0, puoi utilizzare Iceberg con il cluster Spark senza dover includere operazioni di bootstrap. Per Amazon EMR versione 6.4.0 e precedenti, puoi utilizzare un'operazione di bootstrap per preinstallare tutte le dipendenze necessarie.

In questo tutorial, lo utilizzerai AWS CLI per lavorare con Iceberg su un cluster Amazon EMR Spark. Per utilizzare la console per creare un cluster con Iceberg installato, segui la procedura illustrata in Creazione di un data lake Apache Iceberg utilizzando Amazon Athena, Amazon EMR e AWS Glue.

Creazione di un cluster Iceberg

Puoi creare un cluster con Iceberg installato utilizzando AWS Management Console, the AWS CLI o l'API Amazon EMR. In questo tutorial, lo utilizzerai AWS CLI per lavorare con Iceberg su un cluster Amazon EMR. Per utilizzare la console per creare un cluster con Iceberg installato, segui la procedura illustrata in Creazione di un data lake Apache Iceberg utilizzando Amazon Athena, Amazon EMR e AWS Glue.

Per utilizzare Iceberg su Amazon EMR con AWS CLI, crea innanzitutto un cluster con i seguenti passaggi. Per informazioni su come specificare la classificazione Iceberg utilizzando il AWS CLI, consulta o. Fornisci una configurazione utilizzando AWS CLI quando crei un cluster Fornitura di una configurazione utilizzando l'SDK Java per la creazione di un cluster

  1. Crea un file configurations.json con i seguenti contenuti:

    [{ "Classification":"iceberg-defaults", "Properties":{"iceberg.enabled":"true"} }]
  2. Quindi, crea un cluster con la seguente configurazione. Sostituisci il percorso del bucket Amazon S3 di esempio e l'ID della sottorete con i tuoi valori.

    aws emr create-cluster --release-label emr-6.5.0 \ --applications Name=Spark \ --configurations file://iceberg_configurations.json \ --region us-east-1 \ --name My_Spark_Iceberg_Cluster \ --log-uri s3://amzn-s3-demo-bucket/ \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

In alternativa, puoi creare un cluster Amazon EMR che include l'applicazione Spark e includere il file /usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar come dipendenza JAR in un processo Spark. Per ulteriori informazioni, consulta Invio di applicazioni.

Per includere il jar come dipendenza in un processo Spark, aggiungi la seguente proprietà di configurazione all'applicazione Spark:

--conf "spark.jars=/usr/share/aws/iceberg/lib/iceberg-spark3-runtime.jar"

Per ulteriori informazioni sulle dipendenze dei processi Spark, consulta Dependency Management (Gestione delle dipendenze) nel documento di Apache Spark Running Spark on Kubernetes (Esecuzione di Spark su Kubernetes).

Inizializzazione di una sessione Spark per Iceberg

Negli esempi seguenti viene illustrato come avviare la shell interattiva Spark, utilizzare Spark submit o utilizzare Amazon EMR Notebooks per lavorare con Iceberg su Amazon EMR.

spark-shell
  1. Connessione al nodo master tramite SSH Per ulteriori informazioni, consulta Connessione al nodo master tramite SSH nella Guida alla gestione di Amazon EMR.

  2. Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, spark-shell sostituiscila con. pyspark

    spark-shell \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket/prefix/ --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark-submit
  1. Connessione al nodo master tramite SSH Per ulteriori informazioni, consulta Connessione al nodo master tramite SSH nella Guida alla gestione di Amazon EMR.

  2. Immettere il seguente comando per avviare la sessione Spark per Iceberg.

    spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \ --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
EMR Studio notebooks

Per inizializzare una sessione Spark utilizzando i notebook EMR Studio, configurare la sessione Spark utilizzando il comando magico %%configure sul tuo notebook Amazon EMR, come nell'esempio seguente. Per ulteriori informazioni, consulta Utilizzo di EMR Notebooks magics nella Guida alla gestione di Amazon EMR.

%%configure -f{ "conf":{ "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.my_catalog.type":"glue", "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/", "spark.sql.defaultCatalog", "my_catalog", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } }
CLI

Per inizializzare un cluster Spark utilizzando la CLI e impostare tutte le configurazioni predefinite della sessione Spark Iceberg, esegui il seguente esempio. Per ulteriori informazioni su come specificare una classificazione di configurazione utilizzando l' AWS CLI API Amazon EMR, consulta Configurare le applicazioni.

[ { "Classification": "spark-defaults", "Properties": { "spark.sql.catalog.my_catalog":"org.apache.iceberg.spark.SparkCatalog", "spark.sql.catalog.my_catalog.type":"glue", "spark.sql.catalog.my_catalog.warehouse":"s3://amzn-s3-demo-bucket1/prefix/", "spark.sql.defaultCatalog", "my_catalog", "spark.sql.extensions":"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions" } } ]

Scrittura su una tabella Iceberg

L'esempio seguente mostra come creare DataFrame e scrivere un set di dati Iceberg. In questa sezione, gli esempi illustrano l'utilizzo di set di dati con la Spark shell connessi al nodo master utilizzando SSH come utente hadoop predefinito.

Nota

Per incollare gli esempi di codice nella shell Spark, digitare :paste al prompt, incollare l'esempio e premere CTRL+D.

PySpark

Spark include anche una shell basata su Python, pyspark, che puoi utilizzare per realizzare prototipi di programmi Spark scritti in Python. Richiama pyspark sul nodo principale.

## Create a DataFrame. data = spark.createDataFrame([ ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ],["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame. val data = Seq( ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z") ).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Iceberg dataset to the Amazon S3 location. spark.sql("""CREATE TABLE IF NOT EXISTS dev.db.iceberg_table (id string, creation_date string, last_update_time string) USING iceberg location 's3://amzn-s3-demo-bucket/example-prefix/db/iceberg_table'""") data.writeTo("dev.db.iceberg_table").append()

Lettura da una tabella Iceberg

PySpark
df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Scala
val df = spark.read.format("iceberg").load("dev.db.iceberg_table") df.show()
Spark SQL
SELECT * from dev.db.iceberg_table LIMIT 10

Utilizzo di AWS Glue Data Catalog con Spark Iceberg

Puoi connetterti a AWS Glue Data Catalog da Spark Iceberg. Questa sezione mostra diversi comandi per la connessione.

Connect al catalogo AWS Glue predefinito nella regione predefinita

Questo esempio mostra come effettuare la connessione utilizzando il tipo di catalogo Glue. Se non specificate un ID del catalogo, viene utilizzato il valore predefinito:

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \ --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Connect a un catalogo AWS Glue con un ID di catalogo specifico

Questo esempio mostra come connettersi utilizzando un ID di catalogo:

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \ --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.catalog.my_catalog.glue.id=AWS Glue catalog ID \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Questo comando può essere utilizzato per connettersi a un catalogo AWS Glue in un account diverso, a un catalogo RMS o a un catalogo federato.

Usare Iceberg REST Catalog (IRC) con Spark Iceberg

Le sezioni che seguono spiegano in dettaglio come configurare l'integrazione di Iceberg con un catalogo.

Connect all' AWS endpoint IRC di Glue Data Catalog

Quanto segue mostra un spark-submit comando di esempio per usare Iceberg REST:

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=glue catalog ID \ --conf spark.sql.catalog.my_catalog.type=rest \ --conf spark.sql.catalog.my_catalog.uri=glue endpoint URI/iceberg \ --conf spark.sql.catalog.my_catalog.rest.sigv4-enabled=true \ --conf spark.sql.catalog.my_catalog.rest.signing-name=glue \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Per utilizzarlo su un cluster abilitato al runtime-role, sono necessarie le seguenti impostazioni di configurazione aggiuntive di Spark:

"spark.hadoop.fs.s3.credentialsResolverClass": "software.amazon.glue.GlueTableCredentialsResolver", "spark.hadoop.catalog-impl": "org.apache.iceberg.aws.glue.GlueCatalog", "spark.hadoop.glue.id": glue catalog ID "spark.hadoop.glue.endpoint": "glue endpoint"

Per l'elenco degli URL degli endpoint AWS Glue per ogni regione, vedi AWS Glue endpoints e quote.

Connect a un endpoint IRC arbitrario

Quanto segue mostra un spark-submit comando di esempio per l'utilizzo di un endpoint IRC:

spark-submit \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=warehouse name \ --conf spark.sql.catalog.my_catalog.type=rest \ --conf spark.sql.catalog.my_catalog.uri=your rest endpoint \ --conf spark.sql.defaultCatalog=my_catalog \ --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

Differenze di configurazione quando si utilizza Iceberg rispetto a SparkCatalog SparkSessionCatalog

Iceberg offre due modi per creare cataloghi Spark Iceberg. Puoi impostare la configurazione di Spark su uno o su. SparkCatalog SparkSessionCatalog

Usare Iceberg SparkCatalog

Di seguito viene illustrato il comando da utilizzare SparkCatalogcome catalogo Spark Iceberg:

spark-shell \ --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \ --conf spark.sql.catalog.my_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \ --conf spark.sql.catalog.my_catalog.type=glue \ --conf spark.sql.defaultCatalog=my_catalog

Considerazioni per questo approccio:

  • Puoi accedere alle tabelle Iceberg ma non ad altre tabelle.

  • Il nome del catalogo non può essere spark_catalog. Questo è il nome del catalogo iniziale in Spark. Si collega sempre a un metastore Hive. È il catalogo predefinito in Spark, a meno che l'utente non lo sovrascriva utilizzando. spark.sql.defaultCatalog

  • Puoi impostare il spark.sql.defaultCatalog nome del tuo catalogo per renderlo il catalogo predefinito.

Usare Iceberg SparkSessionCatalog

Di seguito viene illustrato il comando da utilizzare SparkSessionCatalogcome catalogo Spark Iceberg:

spark-shell \ --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \ --conf spark.sql.catalog.spark_catalog.warehouse=s3://amzn-s3-demo-bucket1/prefix \ --conf spark.sql.catalog.spark_catalog.type=glue

Considerazioni per questo approccio:

Utilizzo delle estensioni Iceberg Spark

Iceberg offre l'estensione Spark org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions che gli utenti possono impostare tramite la configurazione delle estensioni Spark. spark.sql.extensions Le estensioni abilitano le funzionalità chiave di Iceberg come DELETE, UPDATE e MERGE a livello di riga, le istruzioni e le procedure del linguaggio di definizione dei dati Spark specifiche di Iceberg, come compattazione, scadenza delle istantanee, ramificazione e tagging e così via. Per maggiori dettagli, consulta quanto segue:

Considerazioni sull'utilizzo di Iceberg con Spark