Athena for Spark로 파일 및 Python 라이브러리 가져오기
이 문서에서는 파일 및 Python 라이브러리를 Apache Spark용 Amazon Athena로 가져오는 방법에 대한 예제를 제공합니다.
고려 사항 및 제한
-
Python 버전 – 현재 Athena for Spark는 Python 버전 3.9.16을 사용합니다. Python 패키지는 Python 마이너 버전에 민감합니다.
-
Athena for Spark 아키텍처 - Athena for Spark는 ARM64 아키텍처 기반 Amazon Linux 2를 사용합니다. 일부 Python 라이브러리는 이 아키텍처에 대한 바이너리를 배포하지 않습니다.
-
바이너리 공유 객체(SO) – SparkContext addPyFile
메서드는 바이너리 공유 객체를 탐지하지 않으므로 Athena for Spark에서 공유 객체에 중속된 Python 패키지를 추가할 때 사용할 수 없습니다. -
Resilient Distributed Dataset(RDD) – RDD
는 지원되지 않습니다. -
Dataframe.foreach – PySpark DataFrame.foreach
메서드는 지원되지 않습니다.
예시
예제에서는 다음과 같은 규칙을 사용합니다.
-
자리 표시자 Amazon S3 위치
s3://amzn-s3-demo-bucket
. 사용자의 S3 버킷으로 대체합니다. -
Unix 셸에서 실행되는 모든 코드 블록은
directory_name
$
으로 표시됩니다. 예를 들어,ls
디렉터리의/tmp
명령과 해당 출력은 다음과 같이 표시됩니다./tmp $ ls
출력
file1 file2
계산에 사용할 텍스트 파일 가져오기
이 단원의 예제에서는 Athena for Spark의 노트북에서 계산에 사용할 텍스트 파일을 가져오는 방법을 보여줍니다.
다음 예제에서는 로컬 임시 디렉터리에 파일을 쓴 후 노트북에 추가하고 테스트하는 방법을 보여줍니다.
import os from pyspark import SparkFiles tempdir = '/tmp/' path = os.path.join(tempdir, "test.txt") with open(path, "w") as testFile: _ = testFile.write("5") sc.addFile(path) def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
출력
Calculation completed.
+---+---+-------+
| _1| _2| col|
+---+---+-------+
| 1| a|[aaaaa]|
| 2| b|[bbbbb]|
+---+---+-------+
다음 예제에서는 Amazon S3에서 노트북으로 파일을 가져와서 테스트하는 방법을 보여줍니다.
Amazon S3에서 노트북으로 파일을 가져오려면
-
값
5
가 포함된 한 줄로 구성된test.txt
라는 파일을 생성합니다. -
Amazon S3에서 버킷에 파일을 추가합니다. 이 예제에서는
s3://amzn-s3-demo-bucket
위치를 사용합니다. -
다음 코드를 사용하여 파일을 노트북으로 가져온 다음 테스트합니다.
from pyspark import SparkFiles sc.addFile('s3://amzn-s3-demo-bucket/test.txt') def func(iterator): with open(SparkFiles.get("test.txt")) as testFile: fileVal = int(testFile.readline()) return [x * fileVal for x in iterator] #Test the file from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show()
출력
Calculation completed. +---+---+-------+ | _1| _2| col| +---+---+-------+ | 1| a|[aaaaa]| | 2| b|[bbbbb]| +---+---+-------+
Python 파일 추가
이 단원의 예제에서는 Python 파일 및 라이브러리를 Athena의 Spark 노트북에 추가하는 방법을 보여줍니다.
다음 예제에서는 Amazon S3의 Python 파일을 노트북에 추가하고 UDF를 등록하는 방법을 보여줍니다.
Python 파일을 노트북에 추가하고 UDF를 등록하려면
-
사용자의 Amazon S3 위치를 사용하여 다음 콘텐츠가 포함된
s3://amzn-s3-demo-bucket/file1.py
파일을 생성합니다.def xyz(input): return 'xyz - udf ' + str(input);
-
동일한 S3 위치를 사용하여 다음 콘텐츠가 포함된
s3://amzn-s3-demo-bucket/file2.py
파일을 생성합니다.from file1 import xyz def uvw(input): return 'uvw -> ' + xyz(input);
-
Athena for Spark 노트북에서 다음 명령을 실행합니다.
sc.addPyFile('s3://amzn-s3-demo-bucket/file1.py') sc.addPyFile('s3://amzn-s3-demo-bucket/file2.py') def func(iterator): from file2 import uvw return [uvw(x) for x in iterator] from pyspark.sql.functions import udf from pyspark.sql.functions import col udf_with_import = udf(func) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", udf_with_import(col('_2'))).show(10)
출력
Calculation started (calculation_id=1ec09e01-3dec-a096-00ea-57289cdb8ce7) in (session=c8c09e00-6f20-41e5-98bd-4024913d6cee). Checking calculation status... Calculation completed. +---+---+--------------------+ | _1| _2| col| +---+---+--------------------+ | 1 | a|[uvw -> xyz - ud... | | 2 | b|[uvw -> xyz - ud... | +---+---+--------------------+
Python addPyFile
및 import
메서드를 사용하여 Python .zip 파일을 노트북으로 가져올 수 있습니다.
참고
Athena Spark로 가져오는 .zip
파일에는 Python 패키지만 포함될 수 있습니다. 예를 들어 C 기반 파일이 있는 패키지의 포함은 지원되지 않습니다.
Python .zip
파일을 노트북으로 가져오려면
-
로컬 컴퓨터의 데스크톱 디렉터리(예:
\tmp
)에moduletest
디렉터리를 생성합니다. -
moduletest
디렉터리에 다음 콘텐츠로hello.py
라는 파일을 생성합니다.def hi(input): return 'hi ' + str(input);
-
동일한 디렉터리에서 이름이
__init__.py
인 빈 파일을 추가합니다.이제 디렉터리 콘텐츠가 다음과 같이 나열됩니다.
/tmp $ ls moduletest __init__.py hello.py
-
zip
명령을 사용하여 두 모듈 파일을moduletest.zip
파일에 배치합니다.moduletest $ zip -r9 ../moduletest.zip *
-
Amazon S3의 버킷에
.zip
파일을 업로드합니다. -
다음 코드를 사용하여 Python
.zip
파일을 노트북으로 가져옵니다.sc.addPyFile('s3://amzn-s3-demo-bucket/moduletest.zip') from moduletest.hello import hi from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(hi) df = spark.createDataFrame([(1, "a"), (2, "b")]) df.withColumn("col", hi_udf(col('_2'))).show()
출력
Calculation started (calculation_id=6ec09e8c-6fe0-4547-5f1b-6b01adb2242c) in (session=dcc09e8c-3f80-9cdc-bfc5-7effa1686b76). Checking calculation status... Calculation completed. +---+---+----+ | _1| _2| col| +---+---+----+ | 1| a|hi a| | 2| b|hi b| +---+---+----+
다음 코드 예제에서는 Amazon S3의 한 위치에서 두 가지 버전의 Python 라이브러리를 두 개별 모듈로 추가하고 가져오는 방법을 보여줍니다. 이 코드는 S3에서 각 라이브러리 파일을 추가하고 가져온 다음 라이브러리 버전을 인쇄하여 가져오기를 확인합니다.
sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_15.zip') sc.addPyFile('s3://amzn-s3-demo-bucket/python-third-party-libs-test/simplejson_v3_17_6.zip') import simplejson_v3_15 print(simplejson_v3_15.__version__)
출력
3.15.0
import simplejson_v3_17_6 print(simplejson_v3_17_6.__version__)
출력
3.17.6
이 예제에서는 pip
명령을 사용하여 Python 패키지 인덱스(PyPI)
PyPI에서 Python .zip 파일을 가져오려면
-
로컬 데스크톱에서 다음 명령을 사용하여
testpiglatin
디렉터리를 만들고 가상 환경을 생성합니다./tmp $ mkdir testpiglatin /tmp $ cd testpiglatin testpiglatin $ virtualenv .
출력
created virtual environment CPython3.9.6.final.0-64 in 410ms creator CPython3Posix(dest=/private/tmp/testpiglatin, clear=False, no_vcs_ignore=False, global=False) seeder FromAppData(download=False, pip=bundle, setuptools=bundle, wheel=bundle, via=copy, app_data_dir=/Users/user1/Library/Application Support/virtualenv) added seed packages: pip==22.0.4, setuptools==62.1.0, wheel==0.37.1 activators BashActivator,CShellActivator,FishActivator,NushellActivator,PowerShellActivator,PythonActivator
-
프로젝트를 보관할
unpacked
하위 디렉터리를 생성합니다.testpiglatin $ mkdir unpacked
-
pip
명령을 사용하여unpacked
디렉터리에 프로젝트를 설치합니다.testpiglatin $ bin/pip install -t $PWD/unpacked piglatin
출력
Collecting piglatin Using cached piglatin-1.0.6-py2.py3-none-any.whl (3.1 kB) Installing collected packages: piglatin Successfully installed piglatin-1.0.6
-
디렉터리의 내용을 확인합니다.
testpiglatin $ ls
출력
bin lib pyvenv.cfg unpacked
-
unpacked
디렉터리로 변경하고 내용을 표시합니다.testpiglatin $ cd unpacked unpacked $ ls
출력
piglatin piglatin-1.0.6.dist-info
-
zip
명령을 사용하여 piglatin 프로젝트의 내용을library.zip
파일에 포함합니다.unpacked $ zip -r9 ../library.zip *
출력
adding: piglatin/ (stored 0%) adding: piglatin/__init__.py (deflated 56%) adding: piglatin/__pycache__/ (stored 0%) adding: piglatin/__pycache__/__init__.cpython-39.pyc (deflated 31%) adding: piglatin-1.0.6.dist-info/ (stored 0%) adding: piglatin-1.0.6.dist-info/RECORD (deflated 39%) adding: piglatin-1.0.6.dist-info/LICENSE (deflated 41%) adding: piglatin-1.0.6.dist-info/WHEEL (deflated 15%) adding: piglatin-1.0.6.dist-info/REQUESTED (stored 0%) adding: piglatin-1.0.6.dist-info/INSTALLER (stored 0%) adding: piglatin-1.0.6.dist-info/METADATA (deflated 48%)
-
(선택 사항) 다음 명령을 사용하여 로컬에서 가져오기를 테스트합니다.
-
Python 경로를
library.zip
파일 위치로 설정하고 Python을 시작합니다./home $ PYTHONPATH=/tmp/testpiglatin/library.zip /home $ python3
출력
Python 3.9.6 (default, Jun 29 2021, 06:20:32) [Clang 12.0.0 (clang-1200.0.32.29)] on darwin Type "help", "copyright", "credits" or "license" for more information.
-
라이브러리를 가져오고 테스트 명령을 실행합니다.
>>> import piglatin >>> piglatin.translate('hello')
출력
'ello-hay'
-
-
다음과 같은 명령을 사용하여 Amazon S3에서
.zip
파일을 추가하고 Athena에 있는 노트북으로 파일을 가져와서 테스트합니다.sc.addPyFile('s3://amzn-s3-demo-bucket/library.zip') import piglatin piglatin.translate('hello') from pyspark.sql.functions import udf from pyspark.sql.functions import col hi_udf = udf(piglatin.translate) df = spark.createDataFrame([(1, "hello"), (2, "world")]) df.withColumn("col", hi_udf(col('_2'))).show()
출력
Calculation started (calculation_id=e2c0a06e-f45d-d96d-9b8c-ff6a58b2a525) in (session=82c0a06d-d60e-8c66-5d12-23bcd55a6457). Checking calculation status... Calculation completed. +---+-----+--------+ | _1| _2| col| +---+-----+--------+ | 1|hello|ello-hay| | 2|world|orld-way| +---+-----+--------+
이 예제에서는 PyPI에서 마크다운의 텍스트를 Gemini
cjkwrap mistune wcwidth
종속성이 있는 Python .zip 파일을 가져오려면
-
로컬 컴퓨터에서 다음 명령을 사용하여
testmd2gemini
디렉터리를 만들고 가상 환경을 생성합니다./tmp $ mkdir testmd2gemini /tmp $ cd testmd2gemini testmd2gemini$ virtualenv .
-
프로젝트를 보관할
unpacked
하위 디렉터리를 생성합니다.testmd2gemini $ mkdir unpacked
-
pip
명령을 사용하여unpacked
디렉터리에 프로젝트를 설치합니다./testmd2gemini $ bin/pip install -t $PWD/unpacked md2gemini
출력
Collecting md2gemini Downloading md2gemini-1.9.0-py3-none-any.whl (31 kB) Collecting wcwidth Downloading wcwidth-0.2.5-py2.py3-none-any.whl (30 kB) Collecting mistune<3,>=2.0.0 Downloading mistune-2.0.2-py2.py3-none-any.whl (24 kB) Collecting cjkwrap Downloading CJKwrap-2.2-py2.py3-none-any.whl (4.3 kB) Installing collected packages: wcwidth, mistune, cjkwrap, md2gemini Successfully installed cjkwrap-2.2 md2gemini-1.9.0 mistune-2.0.2 wcwidth-0.2.5 ...
-
unpacked
디렉토리로 변경하고 내용을 확인합니다.testmd2gemini $ cd unpacked unpacked $ ls -lah
출력
total 16 drwxr-xr-x 13 user1 wheel 416B Jun 7 18:43 . drwxr-xr-x 8 user1 wheel 256B Jun 7 18:44 .. drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 CJKwrap-2.2.dist-info drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 __pycache__ drwxr-xr-x 3 user1 staff 96B Jun 7 18:43 bin -rw-r--r-- 1 user1 staff 5.0K Jun 7 18:43 cjkwrap.py drwxr-xr-x 7 user1 staff 224B Jun 7 18:43 md2gemini drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 md2gemini-1.9.0.dist-info drwxr-xr-x 12 user1 staff 384B Jun 7 18:43 mistune drwxr-xr-x 8 user1 staff 256B Jun 7 18:43 mistune-2.0.2.dist-info drwxr-xr-x 16 user1 staff 512B Jun 7 18:43 tests drwxr-xr-x 10 user1 staff 320B Jun 7 18:43 wcwidth drwxr-xr-x 9 user1 staff 288B Jun 7 18:43 wcwidth-0.2.5.dist-info
-
zip
명령을 사용하여 md2gemini 프로젝트의 내용을md2gemini.zip
파일에 포함합니다.unpacked $ zip -r9 ../md2gemini *
출력
adding: CJKwrap-2.2.dist-info/ (stored 0%) adding: CJKwrap-2.2.dist-info/RECORD (deflated 37%) .... adding: wcwidth-0.2.5.dist-info/INSTALLER (stored 0%) adding: wcwidth-0.2.5.dist-info/METADATA (deflated 62%)
-
(선택 사항) 다음 명령을 사용하여 라이브러리가 로컬 컴퓨터에서 작동하는지 테스트합니다.
-
Python 경로를
md2gemini.zip
파일 위치로 설정하고 Python을 시작합니다./home $ PYTHONPATH=/tmp/testmd2gemini/md2gemini.zip /home python3
-
라이브러리를 가져오고 테스트를 실행합니다.
>>> from md2gemini import md2gemini >>> print(md2gemini('[abc](https://abc.def)'))
출력
https://abc.def abc
-
-
다음 명령을 사용하여 Amazon S3에서
.zip
파일을 추가하고 Athena에 있는 노트북으로 파일을 가져와서 비 UDF 테스트를 수행합니다.# (non udf test) sc.addPyFile('s3://amzn-s3-demo-bucket/md2gemini.zip') from md2gemini import md2gemini print(md2gemini('[abc](https://abc.def)'))
출력
Calculation started (calculation_id=0ac0a082-6c3f-5a8f-eb6e-f8e9a5f9bc44) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. => https://abc.def (https://abc.def/) abc
-
다음 명령을 사용하여 UDF 테스트를 수행합니다.
# (udf test) from pyspark.sql.functions import udf from pyspark.sql.functions import col from md2gemini import md2gemini hi_udf = udf(md2gemini) df = spark.createDataFrame([(1, "[first website](https://abc.def)"), (2, "[second website](https://aws.com)")]) df.withColumn("col", hi_udf(col('_2'))).show()
출력
Calculation started (calculation_id=60c0a082-f04d-41c1-a10d-d5d365ef5157) in (session=36c0a082-5338-3755-9f41-0cc954c55b35). Checking calculation status... Calculation completed. +---+--------------------+--------------------+ | _1| _2| col| +---+--------------------+--------------------+ | 1|[first website](h...|=> https://abc.de...| | 2|[second website](...|=> https://aws.co...| +---+--------------------+--------------------+