Athena for Spark로 파일 및 Python 라이브러리 가져오기 - Amazon Athena

Athena for Spark로 파일 및 Python 라이브러리 가져오기

이 문서에서는 파일 및 Python 라이브러리를 Apache Spark용 Amazon Athena로 가져오는 방법에 대한 예제를 제공합니다.

고려 사항 및 제한

  • Python 버전 – 현재 Athena for Spark는 Python 버전 3.9.16을 사용합니다. Python 패키지는 Python 마이너 버전에 민감합니다.

  • Athena for Spark 아키텍처 - Athena for Spark는 ARM64 아키텍처 기반 Amazon Linux 2를 사용합니다. 일부 Python 라이브러리는 이 아키텍처에 대한 바이너리를 배포하지 않습니다.

  • 바이너리 공유 객체(SO) – SparkContext addPyFile 메서드는 바이너리 공유 객체를 탐지하지 않으므로 Athena for Spark에서 공유 객체에 중속된 Python 패키지를 추가할 때 사용할 수 없습니다.

  • Resilient Distributed Dataset(RDD)RDD는 지원되지 않습니다.

  • Dataframe.foreach – PySpark DataFrame.foreach 메서드는 지원되지 않습니다.

예시

예제에서는 다음과 같은 규칙을 사용합니다.

  • 자리 표시자 Amazon S3 위치 s3://amzn-s3-demo-bucket. 사용자의 S3 버킷으로 대체합니다.

  • Unix 셸에서 실행되는 모든 코드 블록은 directory_name $으로 표시됩니다. 예를 들어, ls 디렉터리의 /tmp 명령과 해당 출력은 다음과 같이 표시됩니다.

    /tmp $ ls

    출력

    file1 file2

계산에 사용할 텍스트 파일 가져오기

이 단원의 예제에서는 Athena for Spark의 노트북에서 계산에 사용할 텍스트 파일을 가져오는 방법을 보여줍니다.

다음 예제에서는 로컬 임시 디렉터리에 파일을 쓴 후 노트북에 추가하고 테스트하는 방법을 보여줍니다.

import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

출력

Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

다음 예제에서는 Amazon S3에서 노트북으로 파일을 가져와서 테스트하는 방법을 보여줍니다.

Amazon S3에서 노트북으로 파일을 가져오려면
  1. 5가 포함된 한 줄로 구성된 test.txt라는 파일을 생성합니다.

  2. Amazon S3에서 버킷에 파일을 추가합니다. 이 예제에서는 s3://amzn-s3-demo-bucket 위치를 사용합니다.

  3. 다음 코드를 사용하여 파일을 노트북으로 가져온 다음 테스트합니다.

    from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()

    출력

    Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+

Python 파일 추가

이 단원의 예제에서는 Python 파일 및 라이브러리를 Athena의 Spark 노트북에 추가하는 방법을 보여줍니다.

다음 예제에서는 Amazon S3의 Python 파일을 노트북에 추가하고 UDF를 등록하는 방법을 보여줍니다.

Python 파일을 노트북에 추가하고 UDF를 등록하려면
  1. 사용자의 Amazon S3 위치를 사용하여 다음 콘텐츠가 포함된 s3://amzn-s3-demo-bucket/file1.py 파일을 생성합니다.

    def xyz(input): return 'xyz - udf ' + str(input);
  2. 동일한 S3 위치를 사용하여 다음 콘텐츠가 포함된 s3://amzn-s3-demo-bucket/file2.py 파일을 생성합니다.

    from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
  3. Athena for Spark 노트북에서 다음 명령을 실행합니다.

    sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)

    출력

    Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+

Python addPyFileimport 메서드를 사용하여 Python .zip 파일을 노트북으로 가져올 수 있습니다.

참고

Athena Spark로 가져오는 .zip 파일에는 Python 패키지만 포함될 수 있습니다. 예를 들어 C 기반 파일이 있는 패키지의 포함은 지원되지 않습니다.

Python .zip 파일을 노트북으로 가져오려면
  1. 로컬 컴퓨터의 데스크톱 디렉터리(예: \tmp)에 moduletest 디렉터리를 생성합니다.

  2. moduletest 디렉터리에 다음 콘텐츠로 hello.py라는 파일을 생성합니다.

    def hi(input): return 'hi ' + str(input);
  3. 동일한 디렉터리에서 이름이 __init__.py인 빈 파일을 추가합니다.

    이제 디렉터리 콘텐츠가 다음과 같이 나열됩니다.

    /tmp $ ls moduletest __init__.py hello.py
  4. zip 명령을 사용하여 두 모듈 파일을 moduletest.zip 파일에 배치합니다.

    moduletest $ zip -r9 ../moduletest.zip *
  5. Amazon S3의 버킷에 .zip 파일을 업로드합니다.

  6. 다음 코드를 사용하여 Python.zip 파일을 노트북으로 가져옵니다.

    sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()

    출력

    Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+

다음 코드 예제에서는 Amazon S3의 한 위치에서 두 가지 버전의 Python 라이브러리를 두 개별 모듈로 추가하고 가져오는 방법을 보여줍니다. 이 코드는 S3에서 각 라이브러리 파일을 추가하고 가져온 다음 라이브러리 버전을 인쇄하여 가져오기를 확인합니다.

sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)

출력

3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)

출력

3.17.6

이 예제에서는 pip 명령을 사용하여 Python 패키지 인덱스(PyPI)에서 bpabel/piglatin 프로젝트의 Python .zip 파일을 다운로드합니다.

PyPI에서 Python .zip 파일을 가져오려면
  1. 로컬 데스크톱에서 다음 명령을 사용하여 testpiglatin 디렉터리를 만들고 가상 환경을 생성합니다.

    /tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .

    출력

    created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
  2. 프로젝트를 보관할 unpacked 하위 디렉터리를 생성합니다.

    testpiglatin $ mkdir unpacked
  3. pip 명령을 사용하여 unpacked 디렉터리에 프로젝트를 설치합니다.

    testpiglatin $ bin/pip install -t $PWD/unpacked piglatin

    출력

    Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
  4. 디렉터리의 내용을 확인합니다.

    testpiglatin $ ls

    출력

    bin lib pyvenv.cfg unpacked
  5. unpacked 디렉터리로 변경하고 내용을 표시합니다.

    testpiglatin $ cd unpacked unpacked $ ls

    출력

    piglatin piglatin-1.0.6.dist-info
  6. zip 명령을 사용하여 piglatin 프로젝트의 내용을 library.zip 파일에 포함합니다.

    unpacked $ zip -r9 ../library.zip *

    출력

    adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
  7. (선택 사항) 다음 명령을 사용하여 로컬에서 가져오기를 테스트합니다.

    1. Python 경로를 library.zip 파일 위치로 설정하고 Python을 시작합니다.

      /home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3

      출력

      Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
    2. 라이브러리를 가져오고 테스트 명령을 실행합니다.

      >>> import piglatin >>> piglatin.translate('hello')

      출력

      'ello-hay'
  8. 다음과 같은 명령을 사용하여 Amazon S3에서 .zip 파일을 추가하고 Athena에 있는 노트북으로 파일을 가져와서 테스트합니다.

    sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()

    출력

    Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+

이 예제에서는 PyPI에서 마크다운의 텍스트를 Gemini 텍스트 형식으로 변환하는 md2gemini 패키지를 가져옵니다. 이 패키지는 다음에 대한 종속성을 가집니다.

cjkwrap mistune wcwidth
종속성이 있는 Python .zip 파일을 가져오려면
  1. 로컬 컴퓨터에서 다음 명령을 사용하여 testmd2gemini 디렉터리를 만들고 가상 환경을 생성합니다.

    /tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
  2. 프로젝트를 보관할 unpacked 하위 디렉터리를 생성합니다.

    testmd2gemini $ mkdir unpacked
  3. pip 명령을 사용하여 unpacked 디렉터리에 프로젝트를 설치합니다.

    /testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini

    출력

    Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
  4. unpacked 디렉토리로 변경하고 내용을 확인합니다.

    testmd2gemini $ cd unpacked unpacked $ ls -lah

    출력

    total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
  5. zip 명령을 사용하여 md2gemini 프로젝트의 내용을 md2gemini.zip 파일에 포함합니다.

    unpacked $ zip -r9 ../md2gemini *

    출력

    adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
  6. (선택 사항) 다음 명령을 사용하여 라이브러리가 로컬 컴퓨터에서 작동하는지 테스트합니다.

    1. Python 경로를 md2gemini.zip 파일 위치로 설정하고 Python을 시작합니다.

      /home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
    2. 라이브러리를 가져오고 테스트를 실행합니다.

      >>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))

      출력

      https://abc.def abc
  7. 다음 명령을 사용하여 Amazon S3에서 .zip 파일을 추가하고 Athena에 있는 노트북으로 파일을 가져와서 비 UDF 테스트를 수행합니다.

    # (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))

    출력

    Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
  8. 다음 명령을 사용하여 UDF 테스트를 수행합니다.

    # (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()

    출력

    Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+