EMR サーバーレスOSSでの Delta Lake の使用 - Amazon EMR

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

EMR サーバーレスOSSでの Delta Lake の使用

Amazon EMRバージョン 6.9.0 以降

注記

Amazon EMR 7.0.0 以降では、Delta Lake 3.0.0 を使用してdelta-core.jarファイルの名前を に変更しますdelta-spark.jar。Amazon 7EMR.0.0 以降を使用する場合は、設定delta-spark.jarで を指定してください。

Amazon EMR 6.9.0 以降には Delta Lake が含まれているため、Delta Lake を自分でパッケージ化したり、EMRサーバーレスジョブで --packages フラグを指定したりする必要がありません。

  1. EMR サーバーレスジョブを送信するときは、次の設定プロパティがあり、 sparkSubmitParametersフィールドに次のパラメータが含まれていることを確認してください。

    --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  2. Delta テーブルの作成と読み取りdelta_sample.pyをテストするローカル を作成します。

    # delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://DOC-EXAMPLE-BUCKET/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show
  3. 以下を使用 AWS CLIで、delta_sample.pyファイルを Amazon S3 バケットにアップロードします。次に、 start-job-run コマンドを使用して、既存の EMR Serverless アプリケーションにジョブを送信します。

    aws s3 cp delta_sample.py s3://DOC-EXAMPLE-BUCKET/code/ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://DOC-EXAMPLE-BUCKET/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'

Delta Lake で Python ライブラリを使用するには、ライブラリを依存関係としてパッケージ化するかカスタムイメージ として使用することでdelta-coreライブラリを追加できます。

または、 を使用して、 delta-core JAR ファイルから Python ライブラリSparkContext.addPyFileを追加することもできます。

import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])

Amazon EMRバージョン 6.8.0 以前

Amazon 6EMR.8.0 以前を使用している場合は、以下の手順に従ってサーバーレスアプリケーションで Delta Lake OSS EMR を使用します。

  1. Amazon EMR Serverless アプリケーションで Spark のバージョンと互換性のあるオープンソースバージョンの Delta Lake を構築するには、Delta GitHub に移動し、指示に従ってください。

  2. Delta Lake ライブラリを の Amazon S3 バケットにアップロードする AWS アカウント.

  3. アプリケーション設定でEMRサーバーレスジョブを送信するときは、バケットに現在存在する Delta Lake JAR ファイルを含めます。

    --conf spark.jars=s3://DOC-EXAMPLE-BUCKET/jars/delta-core_2.12-1.1.0.jar
  4. Delta テーブルとの間で読み書きを確実にできるようにするには、サンプル PySparkテストを実行します。

    from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://DOC-EXAMPLE-BUCKET/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show