Mengimpor file dan pustaka Python ke Amazon Athena untuk Apache Spark - Amazon Athena

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengimpor file dan pustaka Python ke Amazon Athena untuk Apache Spark

Dokumen ini memberikan contoh cara mengimpor file dan pustaka Python ke Amazon Athena untuk Apache Spark.

Pertimbangan dan batasan

  • Versi Python - Saat ini, Athena untuk Spark menggunakan Python versi 3.9.16. Perhatikan bahwa paket Python sensitif terhadap versi Python minor.

  • Athena untuk arsitektur Spark — Athena untuk Spark menggunakan Amazon Linux 2 pada arsitektur. ARM64 Perhatikan bahwa beberapa pustaka Python tidak mendistribusikan binari untuk arsitektur ini.

  • Objek bersama biner (SOs) — Karena SparkContext addPyFilemetode ini tidak mendeteksi objek bersama biner, metode ini tidak dapat digunakan di Athena untuk Spark untuk menambahkan paket Python yang bergantung pada objek bersama.

  • Resilient Distributed Datasets (RDDs)RDDstidak didukung.

  • DataFrame.forEach — Metode PySpark DataFrame.foreach tidak didukung.

Contoh

Contoh menggunakan konvensi berikut.

  • Lokasi placeholder Amazon S3. s3://amzn-s3-demo-bucket Ganti ini dengan lokasi bucket S3 Anda sendiri.

  • Semua blok kode yang mengeksekusi dari shell Unix ditampilkan sebagai directory_name $. Misalnya, perintah ls dalam direktori /tmp dan outputnya ditampilkan sebagai berikut:

    /tmp $ ls

    Keluaran

    file1 file2

Mengimpor file teks untuk digunakan dalam perhitungan

Contoh di bagian ini menunjukkan cara mengimpor file teks untuk digunakan dalam perhitungan di buku catatan Anda di Athena untuk Spark.

Menambahkan file ke buku catatan setelah menulisnya ke direktori sementara lokal

Contoh berikut menunjukkan cara menulis file ke direktori sementara lokal, menambahkannya ke buku catatan, dan mengujinya.

import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

Keluaran

Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Mengimpor file dari Amazon S3

Contoh berikut menunjukkan cara mengimpor file dari Amazon S3 ke notebook dan mengujinya.

Untuk mengimpor file dari Amazon S3 ke notebook
  1. Buat file bernama test.txt yang memiliki satu baris yang berisi nilai5.

  2. Tambahkan file ke ember di Amazon S3. Contoh ini menggunakan lokasis3://amzn-s3-demo-bucket.

  3. Gunakan kode berikut untuk mengimpor file ke buku catatan Anda dan menguji file tersebut.

    from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

    Keluaran

    Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Menambahkan file Python

Contoh di bagian ini menunjukkan cara menambahkan file dan pustaka Python ke buku catatan Spark Anda di Athena.

Menambahkan file Python dan mendaftarkan UDF

Contoh berikut menunjukkan cara menambahkan file Python dari Amazon S3 ke notebook Anda dan mendaftarkan file. UDF

Untuk menambahkan file Python ke buku catatan Anda dan mendaftarkan UDF
  1. Menggunakan lokasi Amazon S3 Anda sendiri, buat file s3://amzn-s3-demo-bucket/file1.py dengan konten berikut:

    def xyz(input): return 'xyz - udf ' + str(input);
  2. Di lokasi S3 yang sama, buat file s3://amzn-s3-demo-bucket/file2.py dengan konten berikut:

    from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
  3. Di notebook Athena for Spark Anda, jalankan perintah berikut.

    sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)

    Keluaran

    Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+

Mengimpor file.zip Python

Anda dapat menggunakan Python addPyFile dan import metode untuk mengimpor file.zip Python ke buku catatan Anda.

catatan

.zipFile yang Anda impor ke Athena Spark mungkin hanya menyertakan paket Python. Misalnya, termasuk paket dengan file berbasis C tidak didukung.

Untuk mengimpor .zip file Python ke buku catatan
  1. Di komputer lokal Anda, di direktori desktop seperti\tmp, buat direktori yang disebutmoduletest.

  2. Di moduletest direktori, buat file bernama hello.py dengan konten berikut:

    def hi(input): return 'hi ' + str(input);
  3. Di direktori yang sama, tambahkan file kosong dengan nama__init__.py.

    Jika Anda mencantumkan isi direktori, mereka sekarang akan terlihat seperti berikut ini.

    /tmp $ ls moduletest __init__.py hello.py
  4. Gunakan zip perintah untuk menempatkan dua file modul ke dalam file bernamamoduletest.zip.

    moduletest $ zip -r9 ../moduletest.zip *
  5. Unggah .zip file ke bucket Anda di Amazon S3.

  6. Gunakan kode berikut untuk mengimpor .zip file Python ke buku catatan Anda.

    sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Keluaran

    Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+

Mengimpor dua versi pustaka Python sebagai modul terpisah

Contoh kode berikut menunjukkan cara menambahkan dan mengimpor dua versi pustaka Python yang berbeda dari lokasi di Amazon S3 sebagai dua modul terpisah. Kode menambahkan setiap file pustaka dari S3, mengimpornya, dan kemudian mencetak versi pustaka untuk memverifikasi impor.

sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)

Keluaran

3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)

Keluaran

3.17.6

Mengimpor file.zip Python dari PyPI

Contoh ini menggunakan pip perintah untuk men-download file.zip Python dari proyek bpabel/piglatin dari Python Package Index (PyPI).

Untuk mengimpor file.zip Python dari PyPI
  1. Di desktop lokal Anda, gunakan perintah berikut untuk membuat direktori yang disebut testpiglatin dan membuat lingkungan virtual.

    /tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .

    Keluaran

    created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
  2. Buat subdirektori bernama unpacked untuk menampung proyek.

    testpiglatin $ mkdir unpacked
  3. Gunakan pip perintah untuk menginstal proyek ke dalam unpacked direktori.

    testpiglatin $ bin/pip install -t $PWD/unpacked piglatin

    Keluaran

    Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
  4. Periksa isi direktori.

    testpiglatin $ ls

    Keluaran

    bin lib pyvenv.cfg unpacked
  5. Ubah ke unpacked direktori dan tampilkan isinya.

    testpiglatin $ cd unpacked unpacked $ ls

    Keluaran

    piglatin piglatin-1.0.6.dist-info
  6. Gunakan zip perintah untuk menempatkan isi proyek piglatin ke dalam file bernamalibrary.zip.

    unpacked $ zip -r9 ../library.zip *

    Keluaran

    adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
  7. (Opsional) Gunakan perintah berikut untuk menguji impor secara lokal.

    1. Atur jalur Python ke lokasi library.zip file dan mulai Python.

      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3

      Keluaran

      Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
    2. Impor pustaka dan jalankan perintah uji.

      >>> import piglatin >>> piglatin.translate('hello')

      Keluaran

      'ello-hay'
  8. Gunakan perintah seperti berikut ini untuk menambahkan .zip file dari Amazon S3, mengimpornya ke notebook Anda di Athena, dan mengujinya.

    sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Keluaran

    Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+

Mengimpor file.zip Python dari PyPI yang memiliki dependensi

Contoh ini mengimpor paket md2gemini, yang mengubah teks dalam penurunan harga ke format teks Gemini, dari PyPI. Paket ini memiliki dependensi berikut:

cjkwrap mistune wcwidth
Untuk mengimpor file.zip Python yang memiliki dependensi
  1. Di komputer lokal Anda, gunakan perintah berikut untuk membuat direktori yang disebut testmd2gemini dan membuat lingkungan virtual.

    /tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
  2. Buat subdirektori bernama unpacked untuk menampung proyek.

    testmd2gemini $ mkdir unpacked
  3. Gunakan pip perintah untuk menginstal proyek ke dalam unpacked direktori.

    /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini

    Keluaran

    Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
  4. Ubah ke unpacked direktori dan periksa isinya.

    testmd2gemini $ cd unpacked unpacked $ ls -lah

    Keluaran

    total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
  5. Gunakan zip perintah untuk menempatkan isi proyek md2gemini ke dalam file bernama. md2gemini.zip

    unpacked $ zip -r9 ../md2gemini *

    Keluaran

    adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
  6. (Opsional) Gunakan perintah berikut untuk menguji apakah perpustakaan berfungsi di komputer lokal Anda.

    1. Atur jalur Python ke lokasi md2gemini.zip file dan mulai Python.

      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
    2. Impor pustaka dan jalankan pengujian.

      >>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))

      Keluaran

      https://abc.def abc
  7. Gunakan perintah berikut untuk menambahkan .zip file dari Amazon S3, mengimpornya ke notebook Anda di Athena, dan melakukan non test. UDF

    # (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))

    Keluaran

    Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
  8. Gunakan perintah berikut untuk melakukan UDF tes.

    # (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Keluaran

    Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+