Utilisation de Delta Lake OSS avec EMR Serverless - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de Delta Lake OSS avec EMR Serverless

Amazon EMR versions 6.9.0 et supérieures

Note

Amazon EMR 7.0.0 et versions ultérieures utilisent Delta Lake 3.0.0, qui renomme le delta-core.jar fichier en. delta-spark.jar Si vous utilisez Amazon EMR 7.0.0 ou une version ultérieure, assurez-vous de le spécifier delta-spark.jar dans vos configurations.

Amazon EMR 6.9.0 et versions ultérieures incluent Delta Lake. Vous n'avez donc plus à emballer Delta Lake vous-même ni à fournir le --packages drapeau avec vos tâches EMR sans serveur.

  1. Lorsque vous soumettez des tâches EMR sans serveur, assurez-vous que vous disposez des propriétés de configuration suivantes et incluez les paramètres suivants dans le sparkSubmitParameters champ.

    --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  2. Créez un local delta_sample.py pour tester la création et la lecture d'une table Delta.

    # delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://amzn-s3-demo-bucket/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show
  3. À l'aide de AWS CLI, téléchargez le delta_sample.py fichier dans votre compartiment Amazon S3. Utilisez ensuite la start-job-run commande pour soumettre une tâche à une application EMR sans serveur existante.

    aws s3 cp delta_sample.py s3://amzn-s3-demo-bucket/code/ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://amzn-s3-demo-bucket/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'

Pour utiliser les bibliothèques Python avec Delta Lake, vous pouvez ajouter la delta-core bibliothèque en l'empaquetant sous forme de dépendance ou en l'utilisant comme image personnalisée.

Vous pouvez également utiliser le SparkContext.addPyFile pour ajouter les bibliothèques Python à partir du delta-core JAR fichier :

import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])

Amazon EMR versions 6.8.0 et antérieures

Si vous utilisez Amazon EMR 6.8.0 ou une version antérieure, suivez ces étapes pour utiliser Delta Lake OSS avec vos applications EMR sans serveur.

  1. Pour créer une version open source de Delta Lake compatible avec la version de Spark sur votre application Amazon EMR Serverless, accédez au Delta GitHub et suivez les instructions.

  2. Téléchargez les bibliothèques Delta Lake dans un compartiment Amazon S3 de votre Compte AWS.

  3. Lorsque vous soumettez des tâches EMR sans serveur dans la configuration de l'application, incluez les JAR fichiers Delta Lake qui se trouvent désormais dans votre compartiment.

    --conf spark.jars=s3://amzn-s3-demo-bucket/jars/delta-core_2.12-1.1.0.jar
  4. Pour vous assurer que vous pouvez lire et écrire à partir d'une table Delta, exécutez un exemple de PySpark test.

    from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://amzn-s3-demo-bucket/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show