기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
@remote 데코레이터 내에서 함수를 호출하려면 다음 방법 중 하나를 사용하세요.
@remote 데코레이터 메서드를 사용하여 함수를 호출하는 경우 훈련 작업은 함수가 완료될 때까지 기다렸다가 새 작업을 시작합니다. 하지만 RemoteExecutor
API를 사용하는 경우 둘 이상의 작업을 병렬로 실행할 수 있습니다. 다음 섹션에서는 함수를 호출하는 두 가지 방법을 모두 보여줍니다.
@remote 데코레이터를 사용하여 함수를 호출
@remote 데코레이터를 사용하여 함수에 주석을 달 수 있습니다. SageMaker AI는 데코레이터 내부의 코드를 SageMaker 훈련 작업으로 변환합니다. 그러면 훈련 작업은 데코레이터 내에서 함수를 호출하고 작업이 완료될 때까지 기다립니다. 다음 코드 예제는 필요한 라이브러리를 가져오고, SageMaker AI 인스턴스를 시작하고, @remote 데코레이터로 매트릭스 곱셈에 주석을 추가하는 방법을 보여줍니다.
from sagemaker.remote_function import remote
import numpy as np
@remote(instance_type="ml.m5.large
")
def matrix_multiply(a, b):
return np.matmul(a, b)
a = np.array([[1, 0],
[0, 1]])
b = np.array([1, 2])
assert (matrix_multiply(a, b) == np.array([1,2])).all()
데코레이터는 다음과 같이 정의됩니다.
def remote(
*,
**kwarg):
...
데코레이팅된 함수를 호출하면 SageMaker Python SDK는 오류로 인해 발생한 모든 예외를 로컬 메모리로 로드합니다. 다음 코드 예제에서는 divide 함수에 대한 첫 번째 호출이 성공적으로 완료되고 그 결과가 로컬 메모리로 로드됩니다. divide 함수를 두 번째로 호출하면 코드가 오류를 반환하고 이 오류가 로컬 메모리로 로드됩니다.
from sagemaker.remote_function import remote
import pytest
@remote()
def divide(a, b):
return a/b
# the underlying job is completed successfully
# and the function return is loaded
assert divide(10, 5) == 2
# the underlying job fails with "AlgorithmError"
# and the function exception is loaded into local memory
with pytest.raises(ZeroDivisionError):
divide(10, 0)
참고
데코레이팅된 함수는 원격 작업으로 실행됩니다. 스레드가 중단되더라도 기본 작업은 중지되지 않습니다.
로컬 변수 값을 변경하는 방법
데코레이터 함수는 원격 머신에서 실행됩니다. 데코레이팅된 함수 내에서 로컬이 아닌 변수 또는 입력 인수를 변경해도 로컬 값은 변경되지 않습니다.
다음 코드 예제에서는 데코레이터 함수 내에 목록과 딕셔너리가 추가됩니다. 이는 데코레이터 함수가 호출될 때 변경되지 않습니다.
a = []
@remote
def func():
a.append(1)
# when func is invoked, a in the local memory is not modified
func()
func()
# a stays as []
a = {}
@remote
def func(a):
# append new values to the input dictionary
a["key-2"] = "value-2"
a = {"key": "value"}
func(a)
# a stays as {"key": "value"}
데코레이터 함수 내에서 선언된 로컬 변수 값을 변경하려면 함수에서 변수를 반환하세요. 다음 코드 예제는 함수에서 반환되는 로컬 변수 값이 변경되는 것을 보여줍니다.
a = {"key-1": "value-1"}
@remote
def func(a):
a["key-2"] = "value-2"
return a
a = func(a)
-> {"key-1": "value-1", "key-2": "value-2"}
데이터 직렬화 및 역직렬화
원격 함수를 호출하면 SageMaker AI는 입력 및 출력 단계에서 함수 인수를 자동으로 직렬화합니다. 함수 인수와 반환은 cloudpickle
-
딕셔너리, 리스트, 부동 소수, 정수, 문자열, 부울 값, 튜플 등 내장된 Python 객체
-
Numpy 배열
-
Pandas Dataframe
-
Scikit-learn 데이터세트 및 예측기
-
PyTorch 모델
-
TensorFlow 모델
-
XGBoost용 Booster 클래스
다음은 몇 가지 제한 사항과 함께 사용 가능합니다.
-
Dask DataFrame
-
XGBoost Dmatrix 클래스
-
TensorFlow 데이터 세트 및 하위 클래스
-
PyTorch 모델
다음 섹션에는 원격 함수에 몇 가지 제한이 있는 이전 Python 클래스를 사용하는 모범 사례, SageMaker AI가 직렬화된 데이터를 저장하는 위치 및 이에 대한 액세스를 관리하는 방법에 대한 정보가 포함되어 있습니다.
원격 데이터 직렬화 지원이 제한된 Python 클래스 모범 사례
이 섹션에 나열된 Python 클래스는 제한 사항과 함께 사용할 수 있습니다. 다음 섹션에서는 다음과 같은 Python 클래스를 사용하는 방법에 대한 모범 사례를 설명합니다.
-
Dask
DataFrames -
XGBoost DMatric 클래스
-
TensorFlow 데이터세트 및 하위 클래스
-
PyTorch 모델
Dask
-
Dask DataFrame을 원격 함수로 전달하는 방법
-
Dask DataFrame의 요약 통계를 Pandas DataFrame으로 변환하는 방법
Dask DataFrame을 원격 함수로 전달하는 방법
Dask DataFrame
#Do not pass a Dask DataFrame to your remote function as follows
def clean(df: dask.DataFrame ):
cleaned = df[] \ ...
Dask는 사용자가 DataFrame을 사용할 때만 메모리에 Dask DataFrame 데이터를 로드합니다. 원격 함수 내에서 Dask DataFrame을 사용하려면 데이터 경로를 제공해야 합니다. 그러면 Dask는 코드 실행 시 지정한 데이터 경로에서 직접 데이터세트를 읽습니다.
다음 코드 예제에서는 원격 함수 clean
내에서 Dask DataFrame을 사용하는 방법을 보여줍니다. 코드 예제에서는 Dask DataFrame 대신 raw_data_path
가 clean으로 전달됩니다. 코드가 실행되면 raw_data_path
에 지정된 Amazon S3 버킷의 위치에서 데이터세트를 직접 읽습니다. 그런 다음 persist
함수는 후속 random_split
함수를 쉽게 수행할 수 있도록 데이터세트를 메모리에 보관하고 Dask DataFrame API 함수를 사용하여 S3 버킷의 출력 데이터 경로에 다시 기록합니다.
import dask.dataframe as dd
@remote(
instance_type='ml.m5.24xlarge
',
volume_size=300
,
keep_alive_period_in_seconds=600
)
#pass the data path to your remote function rather than the Dask DataFrame itself
def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]):
df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame
cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\
.drop(['column_b', 'column_c'], axis=1)\
.persist() #keep the data in memory to facilitate the following random_split operation
train_df, test_df = cleaned.random_split(split_ratio, random_state=10)
train_df.to_parquet(os.path.join(output_data_path, 'train')
test_df.to_parquet(os.path.join(output_data_path, 'test'))
clean("s3://amzn-s3-demo-bucket/raw/
", "s3://amzn-s3-demo-bucket/cleaned/
", split_ratio=[0.7, 0.3]
)
Dask DataFrame의 요약 통계를 Pandas DataFrame으로 변환하는 방법
Dask DataFrame의 요약 통계는 다음 예제 코드와 같이 compute
메서드를 호출하여 Pandas DataFrame으로 변환할 수 있습니다. 이 예제에서 S3 버킷에는 메모리 또는 Pandas 데이터 프레임에 담을 수 없는 대량 Dask DataFrame이 포함되어 있습니다. 다음 예제에서 원격 함수는 데이터세트를 스캔하고 출력 통계를 포함하는 Dask DataFrame을 describe
에서 Pandas DataFrame으로 반환합니다.
executor = RemoteExecutor(
instance_type='ml.m5.24xlarge
',
volume_size=300
,
keep_alive_period_in_seconds=600
)
future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/
").describe().compute())
future.result()
DMatrix는 XGBoost에서 데이터를 로드하는 데 사용하는 내부 데이터 구조입니다. DMatrix 객체는 컴퓨팅 세션 간에 쉽게 이동할 수 있도록 피클링할 수 없습니다. DMatrix 인스턴스를 직접 전달하면 SerializationError
로 실패하게 됩니다.
원격 함수에 데이터 객체를 전달하고 XGBoost를 사용하여 훈련하는 방법
Pandas DataFrame을 DMatrix 인스턴스로 변환하고 이를 원격 함수의 훈련에 사용하려면 다음 코드 예제와 같이 원격 함수에 직접 전달합니다.
import xgboost as xgb
@remote
def train(df, params):
#Convert a pandas dataframe into a DMatrix DataFrame and use it for training
dtrain = DMatrix(df)
return xgb.train(dtrain, params)
TensorFlow 데이터세트 및 하위 클래스는 TensorFlow가 훈련 중에 데이터를 로드하는 데 사용하는 내부 객체입니다. TensorFlow 데이터세트 및 하위 클래스는 컴퓨팅 세션 간에 쉽게 이동할 수 있도록 피클링할 수 없습니다. Tensorflow 데이터세트 또는 하위 클래스를 직접 전달하면 SerializationError
로 실패하게 됩니다. 다음 코드 예제와 같이 Tensorflow I/O API를 사용하여 스토리지에서 데이터를 로드합니다.
import tensorflow as tf
import tensorflow_io as tfio
@remote
def train(data_path: str, params):
dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt"))
...
train("s3://amzn-s3-demo-bucket/data
", {})
PyTorch 모델은 직렬화할 수 있으며 로컬 환경과 원격 함수 간에 전달 가능합니다. 로컬 환경과 원격 환경의 디바이스 유형(예: GPU 및 CPU)이 다른 경우 훈련된 모델을 로컬 환경에 반환할 수 없습니다. 예를 들어 GPU가 없는 로컬 환경에서 다음 코드를 개발하고 GPU가 있는 인스턴스에서 실행할 경우 훈련된 모델을 직접 반환하면 DeserializationError
가 발생합니다.
# Do not return a model trained on GPUs to a CPU-only environment as follows
@remote(instance_type='ml.g4dn.xlarge
')
def train(...):
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu") # a device without GPU capabilities
model = Net().to(device)
# train the model
...
return model
model = train(...) #returns a DeserializationError if run on a device with GPU
GPU 환경에서 훈련된 모델을 CPU 기능만 포함된 모델로 반환하려면 아래 코드 예제와 같이 PyTorch 모델 I/O API를 직접 사용합니다.
import s3fs
model_path = "s3://amzn-s3-demo-bucket/folder/
"
@remote(instance_type='ml.g4dn.xlarge
')
def train(...):
if torch.cuda.is_available():
device = torch.device("cuda")
else:
device = torch.device("cpu")
model = Net().to(device)
# train the model
...
fs = s3fs.FileSystem()
with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file:
torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU)
train(...) #use the model to train on either CPUs or GPUs
model = Net()
fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file:
model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))
SageMaker AI가 직렬화된 데이터를 저장하는 위치
원격 함수를 호출하면 SageMaker AI는 입력 및 출력 단계에서 함수 인수를 자동으로 직렬화하고 값을 반환합니다. 직렬화된 이 데이터는 S3 버킷의 루트 디렉터리에 저장됩니다. 구성 파일에 루트 디렉터리 <s3_root_uri>
를 지정합니다. 파라미터 job_name
이 자동으로 생성됩니다.
루트 디렉터리에서 SageMaker AI는 현재 작업 디렉터리, 직렬화된 함수, 직렬화된 함수에 대한 인수, 결과 및 직렬화된 함수 호출로 인해 발생한 모든 예외를 포함하는 <job_name>
폴더를 생성합니다.
<job_name>
아래 디렉터리 workdir
에는 현재 작업 디렉터리의 압축 아카이브가 들어 있습니다. 압축 아카이브에는 작업 디렉터리의 모든 Python 파일과 requirements.txt
파일이 포함되어 있는데 이는 원격 함수를 실행하는 데 필요한 종속성을 지정합니다.
다음은 구성 파일에 지정하는 S3 버킷의 폴더 구조 예제입니다.
<s3_root_uri>
/ # specified by s3_root_uri or S3RootUri
<job_name>/ #automatically generated for you
workdir/workspace.zip # archive of the current working directory (workdir)
function/ # serialized function
arguments/ # serialized function arguments
results/ # returned output from the serialized function including the model
exception/ # any exceptions from invoking the serialized function
S3 버킷에 지정하는 루트 디렉터리는 장기 스토리지용이 아닙니다. 직렬화된 데이터는 직렬화 중에 사용된 Python 버전 및 기계 학습(ML) 프레임워크 버전과 밀접하게 연결되어 있습니다. Python 버전 또는 ML 프레임워크를 업그레이드하면 직렬화된 데이터를 사용하지 못할 수 있습니다. 대신 다음을 수행합니다.
-
Python 버전과 ML 프레임워크에 구애받지 않는 형식으로 모델 및 모델 아티팩트를 저장합니다.
-
Python 또는 ML 프레임워크를 업그레이드하는 경우 장기 스토리지에서 모델 결과에 액세스할 수 있습니다.
중요
지정된 시간이 지난 후 직렬화된 데이터를 삭제하려면 S3 버킷에 수명 구성을 설정합니다.
참고
Python pickle
원격 함수의 구성 파일에 포함할 내용에 대한 추가 정보는 구성 파일을 참조하세요.
직렬화된 데이터에 대한 액세스
관리자는 직렬화된 데이터 설정을 제공할 수 있으며 여기에는 구성 파일에서의 위치 및 모든 암호화 설정이 포함됩니다. 기본적으로 직렬화된 데이터는 AWS Key Management Service (AWS KMS) 키로 암호화됩니다. 또한 관리자는 버킷 정책을 사용하여 구성 파일에 지정한 루트 디렉터리의 액세스를 제한할 수 있습니다. 구성 파일은 여러 프로젝트 및 작업에서 공유하며 사용할 수 있습니다. 자세한 정보는 구성 파일을 참조하세요.
RemoteExecutor
API를 사용하여 함수 호출
RemoteExecutor
API를 사용하여 함수를 호출할 수 있습니다. SageMaker AI Python SDK는 RemoteExecutor
호출 내부의 코드를 SageMaker AI 훈련 작업으로 변환합니다. 그러면 훈련 작업에서 함수를 비동기식 작업으로 호출하고 퓨처를 반환합니다. RemoteExecutor
API를 사용하는 경우 둘 이상의 작업을 병렬로 실행할 수 있습니다. Python의 퓨처에 대한 자세한 내용은 Future
다음 코드 예제는 필요한 라이브러리를 가져오고, 함수를 정의하고, SageMaker AI 인스턴스를 시작하고, API를 사용하여 병렬로 2
작업을 실행하기 위한 요청을 제출하는 방법을 보여줍니다.
from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="
ml.m5.large
") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()
RemoteExecutor
클래스는 Concurrent.Futures.Executor
다음 코드 예제에서는 RemoteExecutorAPI
를 사용하여 함수를 정의하고 호출하는 방법을 보여줍니다. 이 예제에서 RemoteExecutor
는 총 4
개 작업을 제출하지만 2
개만 병렬로 제출합니다. 마지막 두 개 작업은 오버헤드를 최소화하면서 클러스터를 재사용합니다.
from sagemaker.remote_function.client import RemoteExecutor
def divide(a, b):
return a/b
with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e:
futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]]
for future in futures:
print(future.result())
max_parallel_job
파라미터는 컴퓨팅 리소스 할당을 최적화하지 않고 속도 제한 메커니즘으로만 사용됩니다. 이전 코드 예제에서 RemoteExecutor
는 작업이 제출되기 전에 두 개의 병렬 작업을 위한 컴퓨팅 리소스를 예약하지 않습니다. max_parallel_job
또는 @remote 데코레이터의 기타 파라미터에 대한 자세한 내용은 원격 함수 클래스 및 메서드 사양
RemoteExecutor
API의 퓨처 클래스
퓨처 클래스는 반환 함수가 비동기식으로 호출될 때 훈련 작업의 반환 함수를 나타내는 퍼블릭 클래스입니다. 퓨처 클래스는 Concurrent.futures.future