Using a cluster with Delta Lake installed - Amazon EMR

Using a cluster with Delta Lake installed

Using a Delta Lake cluster with Spark

Starting with Amazon EMR version 6.9.0, you can use Delta Lake with your Spark cluster without requiring bootstrap actions. For Amazon EMR release 6.8.0 and earlier, you can use bootstrap actions to pre-install all necessary dependencies.

In this tutorial, we will use the AWS CLI to work with Delta Lake on an Amazon EMR Spark cluster.

To use Delta Lake on Amazon EMR with the AWS Command Line Interface, first create a cluster with the following steps. For information on specifying the Delta Lake classification with AWS Command Line Interface, see Supply a configuration using the AWS Command Line Interface when you create a cluster or Supply a configuration using the Java SDK when you create a cluster.

  1. Create a file, configurations.json, with the following content:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"} }]
  2. Create a cluster with the following configuration, replacing the example Amazon S3 bucket path and the subnet ID with your own.

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://DOC-EXAMPLE-BUCKET/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

    Alternatively, you can create an Amazon EMR cluster including the Spark application and include following files as JAR dependencies in a Spark job.:

    /usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storge-s3-dynamodb.jar

    For more information, see Submitting Applications.

    To include a jar dependency in the Spark job, you can add the following configuration properties to the Spark application:

    --conf “spark.jars=/usr/share/aws/delta/lib/delta-core.jar, /usr/share/aws/delta/lib/delta-storage.jar, /usr/share/aws/delta/lib/delta-storage-s3-dynamodb.jar"

    For more information about Spark job dependencies, see Dependency Management.

Initialize a Spark session for Delta Lake

The following examples show how to launch the interactive Spark shell, use Spark submit, or use Amazon EMR Notebooks to work with Delta Lake on Amazon EMR.

spark-shell
  1. Connect to the primary node using SSH. For more information, see Connect to the master node using SSH in the Amazon EMR Management Guide.

  2. Enter the following command to launch the Spark shell. To use the PySpark shell, replace spark-shell with pyspark.

    spark-shell \ --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" \ --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
spark-submit
  1. Connect to the master node using SSH. For more information, see Connect to the primary node using SSH in the Amazon EMR Management Guide.

  2. Enter the following command to launch the Spark session for Delta Lake.

    spark-submit —conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension" —conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog"
EMR Studio notebooks

To initialize a Spark session using EMR Studio notebooks, configure your Spark session using %%configure magic command in your Amazon EMR notebook, as in the following example. For more information, see Use EMR Notebooks magics in the Amazon EMR Management Guide.

%%configure -f { "conf": { "spark.sql.extensions": "io.delta.sql.DeltaSparkSessionExtension", "spark.sql.catalog.spark_catalog": "org.apache.spark.sql.delta.catalog.DeltaCatalog" } }

Write to a Delta Lake table

The following example shows how to create a DataFrame and write it as a Delta Lake dataset. The example shows how to work with datasets with the Spark shell while connected to the primary node using SSH as the default hadoop user.

Note

To paste code samples into the Spark shell, type :paste at the prompt, paste the example, and then press CTRL + D.

PySpark

Spark includes a Python based shell, pyspark, that you can use to prototype Spark programs written in Python. Just as with spark-shell, invoke pyspark on the primary node.

## Create a DataFrame data = spark.createDataFrame([("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")], ["id", "creation_date", "last_update_time"]) ## Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'"""); data.writeTo("delta_table").append()
Scala
import org.apache.spark.sql.SaveMode import org.apache.spark.sql.functions._ // Create a DataFrame val data = Seq(("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z")).toDF("id", "creation_date", "last_update_time") // Write a DataFrame as a Delta Lake dataset to the S3 location spark.sql("""CREATE TABLE IF NOT EXISTS delta_table (id string, creation_date string, last_update_time string) USING delta location 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'"""); data.write.format("delta").mode("append").saveAsTable("delta_table")
SQL
-- Create a Delta Lake table with the S3 location CREATE TABLE delta_table(id string, creation_date string, last_update_time string) USING delta LOCATION 's3://DOC-EXAMPLE-BUCKET/example-prefix/db/delta_table'; -- insert data into the table INSERT INTO delta_table VALUES ("100", "2015-01-01", "2015-01-01T13:51:39.340396Z"), ("101", "2015-01-01", "2015-01-01T12:14:58.597216Z"), ("102", "2015-01-01", "2015-01-01T13:51:40.417052Z"), ("103", "2015-01-01", "2015-01-01T13:51:40.519832Z");

Read from a Delta Lake table

PySpark
ddf = spark.table("delta_table") ddf.show()
Scala
val ddf = spark.table("delta_table") ddf.show()
SQL
SELECT * FROM delta_table;

Use Delta Lake cluster with Spark and AWS Glue

To use the AWS Glue Catalog as the Metastore for Delta Lake tables, create a cluster with following steps. For information on specifying the Delta Lake classification using AWS Command Line Interface, see Supply a configuration using the AWS Command Line Interface when you create a cluster or Supply a configuration using the Java SDK when you create a cluster.

Create a Delta Lake cluster
  1. Create a file, configurations.json, with the following content:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}, {"Classification":"spark-hive-site", "Properties":{"hive.metastore.client.factory.class":"com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"}}]
  2. Create a cluster with the following configuration, replacing the example Amazon S3 bucket path and the subnet ID with your own.

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Spark --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://DOC-EXAMPLE-BUCKET/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

Use Delta Lake cluster with Trino

Starting with Amazon EMR release 6.9.0, you can use Delta Lake with your Trino cluster.

In this tutorial, we will use the AWS CLI to work with Delta Lake on Amazon EMR Trino cluster.

Create a Delta Lake cluster
  1. Create a file, delta_configurations.json, and set values for your chosen catalog. For example, if you want to use the Hive metastore as your catalog, your file should have the following content:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}, {"Classification":"trino-connector-delta", "Properties":{"hive.metastore.uri":"thrift://localhost:9083"}}]

    If you want to use the AWS Glue Catalog as your store, your file should have the following content:

    [{"Classification":"delta-defaults", "Properties":{"delta.enabled":"true"}}, {"Classification":"trino-connector-delta", "Properties":{"hive.metastore":"glue"}}]
  2. Create a cluster with the following configuration, replacing the example Amazon S3 bucket path and the subnet ID with your own.

    aws emr create-cluster --release-label emr-6.9.0 --applications Name=Trino --configurations file://delta_configurations.json --region us-east-1 --name My_Spark_Delta_Cluster --log-uri s3://DOC-EXAMPLE-BUCKET/ --instance-type m5.xlarge --instance-count 2 --service-role EMR_DefaultRole_V2 --ec2-attributes InstanceProfile=EMR_EC2_DefaultRole,SubnetId=subnet-1234567890abcdef0

Initialize Trino session for Delta Lake

To initialize Trino session, run the following command

trino-cli --catalog delta

Write to a Delta Lake table

Create and write to your table with the following SQL commands:

SHOW SCHEMAS; CREATE TABLE default.delta_table (id int, data varchar, category varchar) WITH ( location = 's3://DOC-EXAMPLE-BUCKET/<prefix>'); INSERT INTO default.delta_table VALUES (1,'a','c1'), (2,'b','c2'), (3,'c','c3');

Read from a Delta Lake table

Read from your table with the following SQL command:

SELECT * from default.delta_table;