Utilizzo di un cluster Delta Lake con Spark - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di un cluster Delta Lake con Spark

A partire da Amazon EMR versione 6.9.0, puoi utilizzare Delta Lake con il cluster Spark senza dover eseguire operazioni di bootstrap. Per Amazon EMR rilascio 6.8.0 e precedenti, puoi utilizzare le operazioni di bootstrap per preinstallare le dipendenze necessarie.

Gli esempi seguenti lo utilizzano AWS CLI per lavorare con Delta Lake su un cluster Amazon EMR Spark.

Per utilizzare Delta Lake su Amazon EMR con AWS Command Line Interface, crea innanzitutto un cluster. Per informazioni su come specificare la classificazione Delta Lake con AWS Command Line Interface, consulta Fornire una configurazione utilizzando AWS Command Line Interface quando si crea un cluster o Fornire una configurazione con Java SDK quando si crea un cluster.

  1. Creare un file, configurations.json, con i seguenti contenuti:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
  2. Crea un cluster con la seguente configurazione, sostituendo bucket path e subnet ID di Amazon S3 di esempio con i tuoi valori.

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://DOC-EXAMPLE-BUCKET/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

    In alternativa, puoi creare un cluster Amazon EMR e l'applicazione Spark con i file seguenti come dipendenze JAR in un processo Spark:

    /usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar
    Nota

    Se utilizzi Amazon EMR versione 6.9.0 o successive, usa invece di. /usr/share/aws/delta/lib/delta-spark.jar /usr/share/aws/delta/lib/delta-core.jar

    Per ulteriori informazioni, consulta Invio di applicazioni.

    Per includere una dipendenza jar in un processo Spark, è possibile aggiungere le seguenti proprietà di configurazione all'applicazione Spark:

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

    Per ulteriori informazioni sulle dipendenze dei processi Spark, consulta la sezione Dependency Management (Gestione delle dipendenze).

    Se utilizzi Amazon EMR versione 6.9.0 o successive, aggiungi invece la configurazione. /usr/share/aws/delta/lib/delta-spark.jar

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-spark.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

Inizializzazione della sessione Spark per Delta Lake

Negli esempi seguenti viene illustrato come avviare la shell (interprete di comandi) interattiva Spark, utilizzare Spark-submit o utilizzare i notebook Amazon EMR per lavorare con Delta Lake su Amazon EMR.

spark-shell
  1. Effettua la connessione al nodo primario tramite SSH. Per ulteriori informazioni, consulta la sezione Connect to the primary node using SSH (Connessione al nodo primario tramite SSH) nella Guida alla gestione di Amazon EMR.

  2. Immettere il seguente comando per avviare la shell Spark. Per usare la PySpark shell, sostituiscila con. spark-shell pyspark

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
spark-submit
  1. Effettua la connessione al nodo primario tramite SSH. Per ulteriori informazioni, consulta la sezione Connect to the primary node using SSH (Connessione al nodo primario tramite SSH) nella Guida alla gestione di Amazon EMR.

  2. Inserisci il seguente comando per avviare la sessione Spark per Delta Lake.

    spark-submit —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
EMR Studio notebooks

Per inizializzare una sessione Spark utilizzando i notebook Amazon EMR Studio, configura la sessione Spark utilizzando il comando %%configure magic nel notebook Amazon EMR, come nell'esempio seguente. Per ulteriori informazioni, consulta Utilizzo di EMR Notebooks magics nella Guida alla gestione di Amazon EMR.

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" } }

Scrittura in una tabella Delta Lake

L'esempio seguente mostra come creare un set di dati Delta Lake DataFrame e scriverlo come set di dati. In questa sezione, gli esempi illustrano l'utilizzo di set di dati con la shell (interprete di comandi) Spark connessi al nodo primario utilizzando SSH come utente hadoop predefinito.

Nota

Per incollare gli esempi di codice nella shell Spark, digitare :paste al prompt, incollare l'esempio e premere CTRL + D.

PySpark

Spark include anche una shell (interprete di comandi) basata su Python, pyspark, che puoi utilizzare per realizzare prototipi di programmi Spark scritti in Python. Proprio come con spark-shell, invoca pyspark sul nodo primario.

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'"""); data.write.format("delta").mode("append").saveAsTable("delta_table")
SQL
-- Create a Delta Lake table with the S3 location CREATE TABLE delta_table(id string, creation_date string, last_update_time string) USING delta LOCATION 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'; -- insert data into the table INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");

Lettura da una tabella Delta Lake

PySpark
ddf = spark.table("delta_table") ddf.show()
Scala
val ddf = spark.table("delta_table") ddf.show()
SQL
SELECT * FROM delta_table;