Importar arquivos e bibliotecas em Python para o Athena para Spark - Amazon Athena

Importar arquivos e bibliotecas em Python para o Athena para Spark

Este documento fornece exemplos de como importar arquivos e bibliotecas Python para o Amazon Athena para Apache Spark.

Condições e limitações

  • Versão Python: atualmente, o Athena para Spark usa o Python versão 3.9.16. Observe que os pacotes Python são sensíveis às versões secundárias do Python.

  • Arquitetura do Athena para Spark: o Athena para Spark usa o Amazon Linux 2 na arquitetura ARM64. Observe que algumas bibliotecas Python não distribuem binários para essa arquitetura.

  • Objetos compartilhados binários (SOs): como o método addPyFile do SparkContext não detecta objetos binários compartilhados, ele não pode ser usado no Athena para Spark para adicionar pacotes Python que dependam de objetos compartilhados.

  • Conjuntos de dados resilientes distribuídos (RDDs)RDDs não são compatíveis.

  • Dataframe.foreach: o método DataFrame.foreach do PySpark não é compatível.

Exemplos

Os exemplos usam as convenções a seguir:

  • O espaço reservado para s3://amzn-s3-demo-bucket no local do Amazon S3. Substitua isso pelo seu próprio local para buckets do S3.

  • Todos os blocos de código executados a partir de um shell Unix são apresentados como directory_name $. Por exemplo, o comando ls no diretório /tmp e sua saída são exibidos da seguinte forma:

    /tmp $ ls

    Saída

    file1 file2

Importar arquivos de texto para usar em cálculos

Os exemplos nesta seção mostram como importar arquivos de texto para uso em cálculos em seus cadernos no Athena para Spark.

O exemplo a seguir mostra como gravar um arquivo em um diretório temporário local, adicioná-lo a um caderno e testá-lo.

import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

Saída

Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

O exemplo a seguir mostra como importar um arquivo do Amazon S3 para um caderno e testá-lo.

Para importar um arquivo do Amazon S3 para um caderno
  1. Crie um arquivo denominado test.txt que tenha uma única linha com o valor 5.

  2. Adicione o arquivo a um bucket no Amazon S3. Este exemplo usa o local s3://amzn-s3-demo-bucket.

  3. Use o código a seguir para importar o arquivo para seu caderno e realizar o teste do arquivo.

    from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

    Saída

    Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Adicionar arquivos em Python

Os exemplos nesta seção mostram como adicionar arquivos e bibliotecas Python aos seus cadernos Spark no Athena.

O exemplo a seguir mostra como adicionar arquivos Python do Amazon S3 ao seu caderno e registrar uma UDF.

Para adicionar arquivos Python ao seu caderno e registrar uma UDF
  1. Usando seu próprio local do Amazon S3, crie o arquivo s3://amzn-s3-demo-bucket/file1.py com o seguinte conteúdo:

    def xyz(input): return 'xyz - udf ' + str(input);
  2. No mesmo local do S3, crie o arquivo s3://amzn-s3-demo-bucket/file2.py com o seguinte conteúdo:

    from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
  3. Em seu caderno do Athena para Spark, execute os comandos a seguir.

    sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)

    Saída

    Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+

É possível usar os métodos addPyFile e import do Python para importar um arquivo .zip do Python para o seu caderno.

nota

Os arquivos .zip que você importa para o Athena Spark podem incluir somente pacotes Python. Por exemplo, a inclusão de pacotes com arquivos baseados em C não é compatível.

Para importar um arquivo .zip do Python para seu caderno
  1. Em seu computador local, em um diretório da área de trabalho, como \tmp, crie um diretório denominado moduletest.

  2. No diretório moduletest, crie um arquivo denominado hello.py com o conteúdo a seguir:

    def hi(input): return 'hi ' + str(input);
  3. No mesmo diretório, adicione um arquivo vazio com o nome __init__.py.

    Se você listar o conteúdo do diretório, ele deverá se parecer com o seguinte:

    /tmp $ ls moduletest __init__.py hello.py
  4. Use o comando zip para inserir os dois arquivos do módulo em um arquivo denominado moduletest.zip.

    moduletest $ zip -r9 ../moduletest.zip *
  5. Carregue o arquivo .zip em seu bucket no Amazon S3.

  6. Use o código a seguir para importar o arquivo .zip do Python para seu caderno.

    sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Saída

    Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+

Os exemplos de código a seguir mostram como adicionar e importar duas versões diferentes de uma biblioteca Python de um local no Amazon S3 como dois módulos separados. O código adicionará cada arquivo da biblioteca do S3, realizará a importação deles e, em seguida, imprimirá a versão da biblioteca para verificar a importação.

sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)

Saída

3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)

Saída

3.17.6

Este exemplo usa o comando pip para baixar de um arquivo .zip do Python referente ao projeto bpabel/piglatin do Python Package Index (PyPI).

Para importar um arquivo .zip do Python do PyPI
  1. Em sua área de trabalho local, use os comandos a seguir para criar um diretório denominado testpiglatin e criar um ambiente virtual.

    /tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .

    Saída

    created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
  2. Crie um subdiretório chamado unpacked para manter o projeto.

    testpiglatin $ mkdir unpacked
  3. Use o comando pip para instalar o projeto no diretório unpacked.

    testpiglatin $ bin/pip install -t $PWD/unpacked piglatin

    Saída

    Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
  4. Verifique o conteúdo do diretório.

    testpiglatin $ ls

    Saída

    bin lib pyvenv.cfg unpacked
  5. Altere para o diretório unpacked e exiba seu conteúdo.

    testpiglatin $ cd unpacked unpacked $ ls

    Saída

    piglatin piglatin-1.0.6.dist-info
  6. Use o comando zip para inserir o conteúdo do projeto piglatin em um arquivo denominado library.zip.

    unpacked $ zip -r9 ../library.zip *

    Saída

    adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
  7. (Opcional) Use os seguintes comandos para testar a importação em nível local.

    1. Defina o caminho do Python para o local do arquivo library.zip e inicie o Python.

      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3

      Saída

      Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
    2. Importe a biblioteca e execute um comando para teste.

      >>> import piglatin >>> piglatin.translate('hello')

      Saída

      'ello-hay'
  8. Use comandos, como apresentado a seguir, para adicionar o arquivo .zip do Amazon S3, importá-lo para seu caderno no Athena e testá-lo.

    sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Saída

    Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+

Este exemplo realiza a importação do pacote md2gemini, que converte texto em markdown para o formato de texto Gemini do PyPI. O pacote tem as seguintes dependências:

cjkwrap mistune wcwidth
Para importar um arquivo .zip do Python que tem dependências
  1. Em seu computador local, use os comandos a seguir para criar um diretório denominado testmd2gemini e criar um ambiente virtual.

    /tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
  2. Crie um subdiretório chamado unpacked para manter o projeto.

    testmd2gemini $ mkdir unpacked
  3. Use o comando pip para instalar o projeto no diretório unpacked.

    /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini

    Saída

    Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
  4. Altere para o diretório unpacked e verifique seu conteúdo.

    testmd2gemini $ cd unpacked unpacked $ ls -lah

    Saída

    total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
  5. Use o comando zip para inserir o conteúdo do projeto md2gemini em um arquivo denominado md2gemini.zip.

    unpacked $ zip -r9 ../md2gemini *

    Saída

    adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
  6. (Opcional) Use os comandos a seguir para testar se a biblioteca funciona em seu computador local.

    1. Defina o caminho do Python para o local do arquivo md2gemini.zip e inicie o Python.

      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
    2. Importe a biblioteca e execute um teste.

      >>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))

      Saída

      https://abc.def abc
  7. Use os comandos a seguir para adicionar o arquivo .zip do Amazon S3, importá-lo para seu caderno no Athena e executar um teste que não seja UDF.

    # (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))

    Saída

    Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
  8. Use os comandos a seguir para executar um teste que seja UDF.

    # (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()

    Saída

    Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+