함수 호출 - 아마존 SageMaker

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

함수 호출

@remote 데코레이터 내에서 함수를 호출하려면 다음 방법 중 하나를 사용하세요.

@remote 데코레이터 메서드를 사용하여 함수를 호출하는 경우 훈련 작업은 함수가 완료될 때까지 기다렸다가 새 작업을 시작합니다. 그러나 를 사용하는 경우 둘 이상의 작업을 병렬로 실행할 수 있습니다. RemoteExecutor API 다음 섹션에서는 함수를 호출하는 두 가지 방법을 모두 보여줍니다.

@remote 데코레이터를 사용하여 함수를 호출

@remote 데코레이터를 사용하여 함수에 주석을 달 수 있습니다. SageMaker 데코레이터 내부의 코드를 학습 작업으로 변환합니다. SageMaker 그러면 훈련 작업은 데코레이터 내에서 함수를 호출하고 작업이 완료될 때까지 기다립니다. 다음 코드 예제는 필수 라이브러리를 가져오고, SageMaker 인스턴스를 시작하고, @remote 데코레이터로 행렬 곱셈에 주석을 다는 방법을 보여줍니다.

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

데코레이터는 다음과 같이 정의됩니다.

def remote( *, **kwarg): ...

데코레이션된 함수를 호출하면 SageMaker Python은 오류로 인해 발생한 모든 예외를 로컬 메모리로 SDK 로드합니다. 다음 코드 예제에서는 divide 함수에 대한 첫 번째 호출이 성공적으로 완료되고 그 결과가 로컬 메모리로 로드됩니다. divide 함수를 두 번째로 호출하면 코드가 오류를 반환하고 이 오류가 로컬 메모리로 로드됩니다.

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
참고

데코레이팅된 함수는 원격 작업으로 실행됩니다. 스레드가 중단되더라도 기본 작업은 중지되지 않습니다.

로컬 변수 값을 변경하는 방법

데코레이터 함수는 원격 머신에서 실행됩니다. 데코레이팅된 함수 내에서 로컬이 아닌 변수 또는 입력 인수를 변경해도 로컬 값은 변경되지 않습니다.

다음 코드 예제에서는 데코레이터 함수 내에 목록과 딕셔너리가 추가됩니다. 이는 데코레이터 함수가 호출될 때 변경되지 않습니다.

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

데코레이터 함수 내에서 선언된 로컬 변수 값을 변경하려면 함수에서 변수를 반환하세요. 다음 코드 예제는 함수에서 반환되는 로컬 변수 값이 변경되는 것을 보여줍니다.

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

데이터 직렬화 및 역직렬화

원격 함수를 호출하면 는 입력 및 출력 단계에서 함수 인수를 SageMaker 자동으로 직렬화합니다. 함수 인수와 반환은 cloudpickle을 사용하여 직렬화됩니다. SageMaker 다음 Python 객체 및 함수의 직렬화를 지원합니다.

  • 딕셔너리, 리스트, 부동 소수, 정수, 문자열, 부울 값, 튜플 등 내장된 Python 객체

  • Numpy 배열

  • Pandas Dataframe

  • Scikit-learn 데이터 세트 및 예측기

  • PyTorch 모델

  • TensorFlow 모델

  • 부스터 클래스의 경우 XGBoost

다음은 몇 가지 제한 사항과 함께 사용 가능합니다.

  • 다스크 DataFrames

  • XGBoost디매트릭스 클래스

  • TensorFlow 데이터셋 및 서브클래스

  • PyTorch 모델

다음 섹션에는 원격 함수에 몇 가지 제한이 있는 이전 Python 클래스를 사용하는 모범 사례, 직렬화된 데이터를 SageMaker 저장하는 위치 및 데이터에 대한 액세스를 관리하는 방법에 대한 정보가 포함되어 있습니다.

원격 데이터 직렬화 지원이 제한된 Python 클래스 모범 사례

이 섹션에 나열된 Python 클래스는 제한 사항과 함께 사용할 수 있습니다. 다음 섹션에서는 다음과 같은 Python 클래스를 사용하는 방법에 대한 모범 사례를 설명합니다.

  • Dask DataFrames

  • 더 클래스 XGBoost DMatric

  • TensorFlow 데이터세트와 서브클래스

  • PyTorch 모델

Dask는 Python에서 병렬 컴퓨팅에 사용되는 오픈 소스 라이브러리입니다. 이 섹션에서는 다음을 보여줍니다.

  • Dask를 원격 DataFrame 함수에 전달하는 방법

  • Dask의 요약 통계를 DataFrame Pandas로 변환하는 방법 DataFrame

원격 함수에 DataFrame Dask를 전달하는 방법

DataFramesDask는 사용 가능한 것보다 많은 메모리를 필요로 하는 데이터세트를 보관할 수 있기 때문에 대용량 데이터세트를 처리하는 데 자주 사용됩니다. Dask는 로컬 데이터를 메모리로 로드하지 DataFrame 않기 때문입니다. DataFrameDask를 함수 인수로 원격 함수에 전달하면 Dask가 데이터 자체 대신 로컬 디스크 또는 클라우드 저장소의 데이터에 대한 참조를 전달할 수 있습니다. 다음 코드는 빈 함수에서 작동하는 Dask를 원격 함수 DataFrame 내에 전달하는 예를 보여줍니다. DataFrame

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

Dask는 를 사용할 때만 Dask의 데이터를 DataFrame 메모리로 로드합니다. DataFrame 원격 함수 DataFrame 내에서 Dask를 사용하려면 데이터 경로를 제공하세요. 그러면 Dask는 코드 실행 시 지정한 데이터 경로에서 직접 데이터 세트를 읽습니다.

다음 코드 예제는 원격 함수 DataFrame 내에서 Dask를 사용하는 방법을 보여줍니다. clean 코드 예제에서는 DataFrame Dask 대신 clean으로 raw_data_path 전달됩니다. 코드가 실행되면 raw_data_path에 지정된 Amazon S3 버킷의 위치에서 데이터 세트를 직접 읽습니다. 그러면 persist 함수는 후속 함수를 쉽게 수행할 수 있도록 데이터세트를 메모리에 보관하고 DataFrame API Dask random_split 함수를 사용하여 S3 버킷의 출력 데이터 경로에 다시 씁니다.

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
DataFrameDask의 요약 통계를 Pandas로 변환하는 방법 DataFrame

다음 예제 코드와 같이 compute 메서드를 DataFrame 호출하여 Dask의 요약 통계를 Pandas로 DataFrame 변환할 수 있습니다. 이 예제에서 S3 버킷에는 메모리나 Pandas 데이터프레임에 담을 수 DataFrame 없는 대용량 Dask가 포함되어 있습니다. 다음 예제에서 원격 함수는 데이터 세트를 스캔하고 출력 통계가 DataFrame 포함된 Dask를 Pandas에 반환합니다. describe DataFrame

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrix에서 데이터를 XGBoost 로드하는 데 사용하는 내부 데이터 구조입니다. 연산 세션 간에 쉽게 이동할 수 있도록 DMatrix 객체를 피클링할 수는 없습니다. DMatrix인스턴스를 직접 전달하면 SerializationError a와 함께 실패합니다.

원격 함수에 데이터 객체를 전달하고 다음과 같이 훈련하는 방법 XGBoost

Pandas를 DMatrix 인스턴스로 DataFrame 변환하여 원격 함수 학습에 사용하려면 다음 코드 예제와 같이 원격 함수에 직접 전달하십시오.

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow 데이터세트와 서브클래스는 훈련 중에 데이터를 TensorFlow 로드하는 데 사용되는 내부 개체입니다. TensorFlow 연산 세션 간에 쉽게 이동할 수 있도록 데이터세트와 서브클래스를 피클링할 수 없습니다. Tensorflow 데이터 세트 또는 하위 클래스를 직접 전달하면 SerializationError로 실패하게 됩니다. 다음 코드 예제와 같이 Tensorflow APIs I/O를 사용하여 스토리지에서 데이터를 로드하세요.

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch 모델은 직렬화가 가능하며 로컬 환경과 원격 함수 간에 전달될 수 있습니다. 로컬 환경과 원격 환경의 장치 유형이 서로 다른 경우 (GPUs및CPUs) 와 같이 학습된 모델을 로컬 환경에 반환할 수 없습니다. 예를 들어 다음 코드를 로컬 환경에서 GPUs 개발했지만 를 사용하는 GPUs 인스턴스에서 실행하는 경우 학습된 모델을 직접 반환하면 a가 발생합니다DeserializationError.

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

GPU환경에서 학습된 모델을 CPU 기능만 포함된 모델로 되돌리려면 아래 코드 예제와 같이 PyTorch 모델 I/O를 APIs 직접 사용하십시오.

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

직렬화된 데이터를 SageMaker 저장하는 위치

원격 함수를 호출하면 는 입력 및 출력 단계에서 함수 인수와 반환 값을 SageMaker 자동으로 직렬화합니다. 직렬화된 이 데이터는 S3 버킷의 루트 디렉터리에 저장됩니다. 구성 파일에 루트 디렉터리 <s3_root_uri>를 지정합니다. 파라미터 job_name이 자동으로 생성됩니다.

루트 디렉터리 아래에 현재 작업 디렉터리, 직렬화된 함수, 직렬화된 함수의 인수, 결과 및 직렬화된 함수 호출로 인해 발생한 모든 예외가 들어 있는 <job_name> 폴더를 SageMaker 만듭니다.

<job_name> 아래 디렉터리 workdir에는 현재 작업 디렉터리의 압축 아카이브가 들어 있습니다. 압축 아카이브에는 작업 디렉터리의 모든 Python 파일과 requirements.txt 파일이 포함되어 있는데 이는 원격 함수를 실행하는 데 필요한 종속성을 지정합니다.

다음은 구성 파일에 지정하는 S3 버킷의 폴더 구조 예제입니다.

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

S3 버킷에 지정하는 루트 디렉터리는 장기 스토리지용이 아닙니다. 직렬화된 데이터는 직렬화 중에 사용된 Python 버전 및 기계 학습(ML) 프레임워크 버전과 밀접하게 연결되어 있습니다. Python 버전 또는 ML 프레임워크를 업그레이드하면 직렬화된 데이터를 사용하지 못할 수 있습니다. 대신 다음을 수행합니다.

  • Python 버전과 ML 프레임워크에 구애받지 않는 형식으로 모델 및 모델 아티팩트를 저장합니다.

  • Python 또는 ML 프레임워크를 업그레이드하는 경우 장기 스토리지에서 모델 결과에 액세스할 수 있습니다.

중요

지정된 시간이 지난 후 직렬화된 데이터를 삭제하려면 S3 버킷에 수명 구성을 설정합니다.

참고

Python pickle 모듈로 직렬화된 파일은 Parquet 및 를 포함한 CSV 다른 데이터 형식보다 이식성이 떨어질 수 있습니다. JSON 출처를 알 수 없는 피클된 파일을 로드하지 않도록 주의하세요.

원격 함수의 구성 파일에 포함할 내용에 대한 추가 정보는 구성 파일을 참조하세요.

직렬화된 데이터에 대한 액세스

관리자는 직렬화된 데이터 설정을 제공할 수 있으며 여기에는 구성 파일에서의 위치 및 모든 암호화 설정이 포함됩니다. 기본적으로 직렬화된 데이터는 다음을 사용하여 암호화됩니다. AWS Key Management Service (AWS KMS) 키. 또한 관리자는 버킷 정책을 사용하여 구성 파일에 지정한 루트 디렉터리의 액세스를 제한할 수 있습니다. 구성 파일은 여러 프로젝트 및 작업에서 공유하며 사용할 수 있습니다. 자세한 정보는 구성 파일을 참조하세요.

RemoteExecutor API 사용하여 함수를 호출합니다.

를 사용하여 함수를 RemoteExecutor API 호출할 수 있습니다. SageMaker SDKPython은 RemoteExecutor 호출 내의 코드를 SageMaker 학습 작업으로 변환합니다. 그러면 훈련 작업에서 함수를 비동기식 작업으로 호출하고 퓨처를 반환합니다. 를 사용하는 경우 둘 이상의 교육 작업을 병렬로 실행할 수 있습니다. RemoteExecutor API Python의 퓨처에 대한 자세한 내용은 Future를 참조하세요.

다음 코드 예제는 필수 라이브러리를 가져오고, 함수를 정의하고, SageMaker 인스턴스를 시작하고, 를 사용하여 요청을 API 제출하여 2 작업을 병렬로 실행하는 방법을 보여줍니다.

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

RemoteExecutor 클래스는 Concurrent.Futures.Executor 라이브러리를 구현한 것입니다.

다음 코드 예제에서는 RemoteExecutorAPI를 사용하여 함수를 정의하고 호출하는 방법을 보여줍니다. 이 예제에서 RemoteExecutor는 총 4개 작업을 제출하지만 2개만 병렬로 제출합니다. 마지막 두 개 작업은 오버헤드를 최소화하면서 클러스터를 재사용합니다.

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

max_parallel_job 파라미터는 컴퓨팅 리소스 할당을 최적화하지 않고 속도 제한 메커니즘으로만 사용됩니다. 이전 코드 예제에서 RemoteExecutor는 작업이 제출되기 전에 두 개의 병렬 작업을 위한 컴퓨팅 리소스를 예약하지 않습니다. max_parallel_job 또는 @remote 데코레이터의 기타 파라미터에 대한 자세한 내용은 원격 함수 클래스 및 메서드 사양을 참조하세요.

다음 클래스의 미래 클래스 RemoteExecutor API

퓨처 클래스는 반환 함수가 비동기식으로 호출될 때 훈련 작업의 반환 함수를 나타내는 퍼블릭 클래스입니다. 퓨처 클래스는 Concurrent.futures.future 클래스를 구현합니다. 이 클래스는 기본 작업을 수행하고 데이터를 메모리로 로드하는 데 사용할 수 있습니다.