Importazione di file e librerie Python in Amazon Athena per Apache Spark - Amazon Athena

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Importazione di file e librerie Python in Amazon Athena per Apache Spark

Questo documento fornisce esempi di come importare file e librerie Python in Amazon Athena per Apache Spark.

Considerazioni e limitazioni

  • Versione Python: attualmente, Athena for Spark utilizza la versione Python 3.9.16. Nota che i pacchetti Python sono sensibili alle versioni minori di Python.

  • Architettura Athena per Spark: Athena per Spark utilizza Amazon Linux 2 sull'architettura. ARM64 Nota che alcune librerie Python non distribuiscono file binari per questa architettura.

  • Oggetti binari condivisi (SOs) — Poiché il SparkContext addPyFilemetodo non rileva oggetti binari condivisi, non può essere usato in Athena per Spark per aggiungere pacchetti Python che dipendono da oggetti condivisi.

  • Resilient Distributed Dataset (RDDs): non sono supportati. RDDs

  • DataFrame.foreach — Il metodo .foreach non è supportato. PySpark DataFrame

Esempi

Gli esempi utilizzano le seguenti convenzioni.

  • Il segnaposto della posizione di Amazon S3 s3://amzn-s3-demo-bucket. Sostituisci questo valore con la posizione del tuo bucket S3.

  • Tutti i blocchi di codice eseguiti da una shell Unix sono mostrati come directory_name $. Ad esempio, il comando ls nella directory /tmp e il relativo output vengono visualizzati come segue:

    /tmp $ ls

    Output

    file1 file2

Importazione di file di testo da utilizzare nei calcoli

Gli esempi di questa sezione mostrano come importare file di testo da utilizzare nei calcoli nei notebook in Athena per Spark.

Aggiunta di un file a un notebook dopo averlo scritto nella directory temporanea locale

L'esempio seguente mostra come scrivere un file in una directory temporanea locale, aggiungerlo a un notebook e testarlo.

import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

Output

Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Importazione di un file da Amazon S3

Il seguente esempio mostra come importare un file da Amazon S3 in un notebook e testarlo.

Importazione di un file da Amazon S3 in un notebook
  1. Crea un file denominato test.txt con una singola riga contenente il valore 5.

  2. Aggiungi il file a un bucket in Amazon S3. Questo esempio utilizza la posizione s3://amzn-s3-demo-bucket.

  3. Utilizza il codice seguente per importare il file sul tuo notebook e testarlo.

    from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

    Output

    Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Aggiunta di file Python

Gli esempi in questa sezione mostrano come aggiungere file e librerie Python ai notebook Spark in Athena.

Aggiungere file Python e registrare un UDF

L'esempio seguente mostra come aggiungere file Python da Amazon S3 al notebook e registrare un. UDF

Per aggiungere file Python al tuo taccuino e registrare un UDF
  1. Utilizzando la tua posizione Amazon S3, crea il file s3://amzn-s3-demo-bucket/file1.py con i seguenti contenuti:

    def xyz(input): return 'xyz - udf ' + str(input);
  2. Nella stessa posizione S3, crea il file s3://amzn-s3-demo-bucket/file2.py con i seguenti contenuti:

    from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
  3. Nel tuo notebook Athena per Spark, esegui i seguenti comandi.

    sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)

    Output

    Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+

Importazione di un file .zip Python

Puoi utilizzare i metodi addPyFile e import di Python per importare un file .zip Python nel tuo notebook.

Nota

I file .zip importati in Athena Spark possono includere solo pacchetti Python. Ad esempio, l'inclusione di pacchetti con file basati su C non è supportata.

Importazione di un file .zip Python in un notebook
  1. Sul tuo computer locale, in una directory del desktop come ad esempio \tmp, crea una directory chiamata moduletest.

  2. Nella directory moduletest, creare un file denominato hello.py con il seguente contenuto:

    def hi(input): return 'hi ' + str(input);
  3. Nella stessa directory, aggiungi un file vuoto denominato __init__.py.

    Se elenchi i contenuti della directory, ora dovrebbero apparire come segue.

    /tmp $ ls moduletest __init__.py hello.py
  4. Utilizza il comando zip per inserire i due file del modulo in un file chiamato moduletest.zip.

    moduletest $ zip -r9 ../moduletest.zip *
  5. Carica il file .zip nel tuo bucket in Amazon S3.

  6. Utilizza il codice seguente per importare il file .zip Python nel notebook.

    sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Output

    Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+

Importazione di due versioni di una libreria Python come moduli separati

I seguenti esempi di codice mostrano come aggiungere e importare due versioni distinte di una libreria Python da una posizione in Amazon S3 come due moduli separati. Il codice aggiunge ogni file della libreria da S3, lo importa e quindi stampa la versione della libreria per verificare l'importazione.

sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)

Output

3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)

Output

3.17.6

Importazione di un file .zip Python da PyPI

Questo esempio utilizza il comando pip per scaricare un file .zip in Python del progetto bpabel/piglatin dal Python Package Index (PyPI).

Importazione di un file .zip Python da PyPI
  1. Sul desktop locale, utilizza i seguenti comandi per creare una directory chiamata testpiglatin e creare un ambiente virtuale.

    /tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .

    Output

    created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
  2. Crea una sottodirectory denominata unpacked per ospitare il progetto.

    testpiglatin $ mkdir unpacked
  3. Utilizza il comando pip per installare il progetto nella directory unpacked.

    testpiglatin $ bin/pip install -t $PWD/unpacked piglatin

    Output

    Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
  4. Verifica il contenuto della directory.

    testpiglatin $ ls

    Output

    bin lib pyvenv.cfg unpacked
  5. Passa alla directory unpacked e visualizza il contenuto.

    testpiglatin $ cd unpacked unpacked $ ls

    Output

    piglatin piglatin-1.0.6.dist-info
  6. Utilizza il comando zip per inserire il contenuto del progetto piglatin in un file denominato library.zip.

    unpacked $ zip -r9 ../library.zip *

    Output

    adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
  7. (Facoltativo) Utilizza i seguenti comandi per testare l'importazione localmente.

    1. Imposta il percorso Python nella posizione del file library.zip e avvia Python.

      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3

      Output

      Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
    2. Importa la libreria ed esegui un comando di test.

      >>> import piglatin >>> piglatin.translate('hello')

      Output

      'ello-hay'
  8. Utilizza i comandi seguenti per aggiungere il file .zip da Amazon S3, importarlo nel tuo notebook in Athena e testarlo.

    sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Output

    Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+

Importazione di un file .zip Python da PyPI con dipendenze

Questo esempio importa da PyPI il pacchetto md2gemini, che converte il testo in formato Markdown nel formato di testo Gemini. Il pacchetto include le seguenti dipendenze:

cjkwrap mistune wcwidth
Importazione da PyPI di un file .zip Python con dipendenze
  1. Sul computer locale, utilizza i seguenti comandi per creare una directory chiamata testmd2gemini e creare un ambiente virtuale.

    /tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
  2. Crea una sottodirectory denominata unpacked per ospitare il progetto.

    testmd2gemini $ mkdir unpacked
  3. Utilizza il comando pip per installare il progetto nella directory unpacked.

    /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini

    Output

    Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
  4. Passa alla directory unpacked e controlla il contenuto.

    testmd2gemini $ cd unpacked unpacked $ ls -lah

    Output

    total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
  5. Utilizza il comando zip per inserire il contenuto del progetto md2gemini in un file denominato md2gemini.zip.

    unpacked $ zip -r9 ../md2gemini *

    Output

    adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
  6. (Facoltativo) Utilizza i seguenti comandi per verificare che la libreria funzioni sul computer locale.

    1. Imposta il percorso Python nella posizione del file md2gemini.zip e avvia Python.

      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
    2. Importa la libreria ed esegui un test.

      >>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))

      Output

      https://abc.def abc
  7. Usa i seguenti comandi per aggiungere il .zip file da Amazon S3, importarlo nel tuo notebook in Athena ed eseguire un test non eseguito. UDF

    # (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))

    Output

    Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
  8. Usa i seguenti comandi per eseguire un UDF test.

    # (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Output

    Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+