Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation de Delta Lake OSS avec EMR Serverless
Amazon EMR versions 6.9.0 et supérieures
Note
Amazon EMR 7.0.0 et versions ultérieures utilisent Delta Lake 3.0.0, qui renomme le delta-core.jar
fichier en. delta-spark.jar
Si vous utilisez Amazon EMR 7.0.0 ou une version ultérieure, assurez-vous de le spécifier delta-spark.jar
dans vos configurations.
Amazon EMR 6.9.0 et versions ultérieures incluent Delta Lake. Vous n'avez donc plus à emballer Delta Lake vous-même ni à fournir le --packages
drapeau avec vos tâches EMR sans serveur.
-
Lorsque vous soumettez des tâches EMR sans serveur, assurez-vous que vous disposez des propriétés de configuration suivantes et incluez les paramètres suivants dans le
sparkSubmitParameters
champ.--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
-
Créez un local
delta_sample.py
pour tester la création et la lecture d'une table Delta.# delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://
amzn-s3-demo-bucket
/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show -
À l'aide de AWS CLI, téléchargez le
delta_sample.py
fichier dans votre compartiment Amazon S3. Utilisez ensuite lastart-job-run
commande pour soumettre une tâche à une application EMR sans serveur existante.aws s3 cp delta_sample.py s3://
amzn-s3-demo-bucket
/code/ aws emr-serverless start-job-run \ --application-idapplication-id
\ --execution-role-arnjob-role-arn
\ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://amzn-s3-demo-bucket
/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'
Pour utiliser les bibliothèques Python avec Delta Lake, vous pouvez ajouter la delta-core
bibliothèque en l'empaquetant sous forme de dépendance ou en l'utilisant comme image personnalisée.
Vous pouvez également utiliser le SparkContext.addPyFile
pour ajouter les bibliothèques Python à partir du delta-core
JAR fichier :
import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])
Amazon EMR versions 6.8.0 et antérieures
Si vous utilisez Amazon EMR 6.8.0 ou une version antérieure, suivez ces étapes pour utiliser Delta Lake OSS avec vos applications EMR sans serveur.
-
Pour créer une version open source de Delta Lake
compatible avec la version de Spark sur votre application Amazon EMR Serverless, accédez au Delta GitHub et suivez les instructions. -
Téléchargez les bibliothèques Delta Lake dans un compartiment Amazon S3 de votre Compte AWS.
-
Lorsque vous soumettez des tâches EMR sans serveur dans la configuration de l'application, incluez les JAR fichiers Delta Lake qui se trouvent désormais dans votre compartiment.
--conf spark.jars=s3://
amzn-s3-demo-bucket
/jars/delta-core_2.12-1.1.0.jar -
Pour vous assurer que vous pouvez lire et écrire à partir d'une table Delta, exécutez un exemple de PySpark test.
from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://
amzn-s3-demo-bucket
/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show