本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
叫用遠端函數
若要在 @remote 裝飾項目內部調用函式,請採用下列其中一種方法:
如果您使用 @remote 裝飾項目方法來調用函式,則訓練工作將等待函式完成,然後再開始新工作。不過,如果您使用 RemoteExecutor
API,則可以平行執行多個任務。以下區段展示調用函式的兩種方法。
利用 @remote 裝飾項目調用函式
您可以使用 @remote 裝飾器來註釋函數。 SageMaker 會將裝飾器內的程式碼轉換為 SageMaker 訓練任務。然後,訓練工作會調用裝飾項目內部的函式,並等待工作完成。下列程式碼範例示範如何使用 @remote 裝飾程式匯入所需的程式庫、啟動 SageMaker執行個體,以及註釋矩陣乘法。
from sagemaker.remote_function import remote import numpy as np @remote(instance_type="
ml.m5.large
") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()
裝飾項目定義如下。
def remote( *, **kwarg): ...
當您叫用裝飾函數時, SageMaker Python 會將錯誤產生的任何例外SDK狀況載入本機記憶體。下列代碼範例成功完成第一次呼叫除法函式,並將結果載入本機記憶體。在第二次呼叫除法函式時,代碼傳回錯誤,並將此錯誤載入本機記憶體。
from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
注意
裝飾的函式以遠端工作方式執行。如執行緒被中斷,基礎工作將不會停止。
如何變更本機變數的值
透過遠端機器執行裝飾項目函式。變更裝飾函式內部的非本機變數或輸入引數不會變更本機值。
在下列代碼範例,清單與字典會附加於裝飾項目函式內部。當調用裝飾項目函式時,這點不會變更。
a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}
若要變更在裝飾項目函式內部宣告的本機變數值,請從函式傳回該變數。下列代碼範例示範當從函式傳回本機變數時,會變更其值。
a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}
資料序列化及還原序列化
當您叫用遠端函數時, 會在輸入和輸出階段自動 SageMaker 序列化函數引數。函數引數和傳回會使用 cloudpickle
-
內建 Python 物件,包含字典、清單、浮點數、ints、字串、布林值以及元組
-
Numpy 陣列
-
Pandas Dataframes
-
Scikit-learn 資料集與估算器
-
PyTorch 模型
-
TensorFlow 模型
-
的 Booster 類別 XGBoost
以下內容可於部分限制下使用。
-
Dask DataFrames
-
XGBoost Dmatrix 類別
-
TensorFlow 資料集和子類別
-
PyTorch 模型
下一節包含使用先前 Python 類別的最佳實務,其中包含遠端函數中有一些限制、 SageMaker 儲存序列化資料的位置相關資訊,以及如何管理存取。
Python 類別的最佳實務 (針對遠端資料序列化提供有限支援)
您可以在有限制的情況使用本區段所列的 Python 類別。下個區段將討論使用下列 Python 類別的最佳實務。
-
Dask
DataFrames -
XGBoost DMatric 類別
-
TensorFlow 資料集和子類別
-
PyTorch 模型
Dask
-
如何將 Dask 傳遞 DataFrame 至遠端函數
-
如何將摘要統計資料從 Dask DataFrame 轉換為 Pandas DataFrame
如何將 Dask 傳遞 DataFrame 至遠端函數
Dask DataFrames
#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...
只有當您使用 時,Dask 才會將資料從 Dask 載入 DataFrame 記憶體 DataFrame 。如果您想要在遠端函數 DataFrame 中使用 Dask,請提供資料 的路徑。然後,Dask 將於執行代碼時,直接從您指定的資料路徑讀取資料集。
下列程式碼範例示範如何在遠端函數 中使用 Dask DataFrameclean
。在程式碼範例中, raw_data_path
會傳遞為清除,而不是 Dask DataFrame。當代碼執行時,會從 raw_data_path
指定的 Amazon S3 儲存貯體位置直接讀取資料集。然後persist
,函數會將資料集保留在記憶體中,以促進後續random_split
函數,並使用 Dask DataFrame API 函數寫回 S3 儲存貯體中的輸出資料路徑。
import dask.dataframe as dd @remote( instance_type='
ml.m5.24xlarge
', volume_size=300
, keep_alive_period_in_seconds=600
) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/
", "s3://amzn-s3-demo-bucket/cleaned/
", split_ratio=[0.7, 0.3]
)
如何將摘要統計資料從 Dask DataFrame 轉換為 Pandas DataFrame
DataFrame 透過叫用 compute
方法 DataFrame ,可將來自 Dask 的摘要統計資料轉換為 Pandas,如下列範例程式碼所示。在此範例中,S3 儲存貯體包含 DataFrame 無法放入記憶體或 Pandas 資料架構的大型 Dask。在下列範例中,遠端函數會掃描資料集,並將包含輸出統計資料的 Dask DataFrame 從 傳回describe
至 Pandas DataFrame。
executor = RemoteExecutor( instance_type='
ml.m5.24xlarge
', volume_size=300
, keep_alive_period_in_seconds=600
) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/
").describe().compute()) future.result()
DMatrix 是 XGBoost 用來載入資料的內部資料結構。無法挑選DMatrix物件,以便在運算工作階段之間輕鬆移動。直接傳遞的DMatrix執行個體會失敗,但會採用 SerializationError
。
如何將資料物件傳遞至遠端函數,並使用 進行訓練 XGBoost
若要將 Pandas 轉換為 DataFrame DMatrix執行個體,並在遠端函數中使用它進行訓練,請直接將其傳遞至遠端函數,如下列程式碼範例所示。
import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)
TensorFlow 資料集和子類別是 用於在訓練期間 TensorFlow 載入資料的內部物件。 TensorFlow 資料集和子類別無法挑選,以便在運算工作階段之間輕鬆移動。直接傳遞 Tensorflow 資料集或子類別會失敗,並顯示 SerializationError
。使用 Tensorflow I/O 從儲存體APIs載入資料,如下列程式碼範例所示。
import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("
s3://amzn-s3-demo-bucket/data
", {})
PyTorch 模型是序列化的,可以在您的本機環境和遠端函數之間傳遞。如果您的本機環境和遠端環境具有不同的裝置類型,例如 (GPUs 和 CPUs),則您無法將訓練過的模型傳回至本機環境。例如,如果下列程式碼是在沒有 的本機環境中開發,GPUs但在具有 的執行個體中執行GPUs,則直接傳回訓練的模型將導致 DeserializationError
。
# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='
ml.g4dn.xlarge
') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU
若要將環境中訓練GPU的模型傳回僅包含 CPU 功能的模型,請APIs直接使用 PyTorch 模型 I/O,如以下程式碼範例所示。
import s3fs model_path = "
s3://amzn-s3-demo-bucket/folder/
" @remote(instance_type='ml.g4dn.xlarge
') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))
SageMaker 存放序列化資料的 位置
當您叫用遠端函數時, SageMaker 會在輸入和輸出階段自動序列化函數引數和傳回值。此序列化資料會儲存於 S3 儲存貯體的根目錄。您可在組態檔案指定根目錄 <s3_root_uri>
。系統會自動為您產生參數 job_name
。
在根目錄下, 會 SageMaker 建立資料夾,該<job_name>
資料夾會保留您目前的工作目錄、序列化函數、序列化函數的引數、結果,以及叫用序列化函數所產生的任何例外狀況。
在 <job_name>
下方,目錄 workdir
會包含目前工作目錄的已壓縮封存。已壓縮封存包含工作目錄與 requirements.txt
檔案的任何 Python 檔案,該檔案會指定執行遠端函式所需的任何相依性。
以下範例針對您在組態檔案指定的 S3 儲存貯體顯示其資料夾結構。
<s3_root_uri>
/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function
您在 S3 儲存貯體指定的根目錄並不適用長期儲存。序列化資料與序列化期間所用的 Python 版本與機器學習 (ML) 架構版本緊密關聯。如您升級 Python 版本或機器學習 (ML) 架構,則可能無法使用序列化資料。相反地,請執行下列動作。
-
以與 Python 版本與機器學習 (ML) 架構無關的格式儲存模型及模型成品。
-
如您升級 Python 或機器學習 (ML) 架構,請從長期儲存存取模型結果。
重要
若要在指定的時間量後刪除序列化資料,請在 S3 儲存貯體設定存留期組態。
注意
使用 Python pickle
如需更多資訊了解遠端函式組態檔案所應包含的內容,請參閱組態檔案。
存取序列化資料
管理員可為序列化資料提供設定,包含其位置及組態檔案的任何加密設定。根據預設,序列化資料會使用 AWS Key Management Service (AWS KMS) 金鑰加密。管理員也可利用儲存貯體政策來限制存取您在組態檔案指定的根目錄。可在專案與工作之間共用及使用組態檔案。如需更多資訊,請參閱組態檔案。
使用 RemoteExecutor
API 來叫用函數
您可以使用 RemoteExecutor
API 來叫用函數。 SageMaker Python SDK會將RemoteExecutor
呼叫 SageMaker中的程式碼轉換為訓練任務。然後,訓練工作會調用該函式作為非同步操作,並傳回未來。如果您使用 RemoteExecutor
API,則可以平行執行多個訓練任務。有關 Python 未來的更多相關資訊,請參閱未來
下列程式碼範例示範如何匯入所需的程式庫、定義函數、啟動 SageMaker 執行個體,並使用 API提交平行執行2
任務的請求。
from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="
ml.m5.large
") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()
RemoteExecutor
類別是 concurrent.futures.Executor
下列代碼範例示範如何定義函式並使用 RemoteExecutorAPI
來呼叫函式。在此範例,RemoteExecutor
將總共提交 4
項任務,但僅 2
個為平行處理。最後兩個任務將以最小額外負荷重複使用叢集。
from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())
max_parallel_job
參數僅做為速率限制機制,而不會最佳化運算資源配置。在先前的代碼範例,在提交任何工作之前,RemoteExecutor
不會為兩個 平行工作保留運算資源。如需更多相關資訊了解 max_parallel_job
或 @remote 裝飾項目的其他參數,請參閱遠端函式類別與方法規格
的未來類別 RemoteExecutor
API
未來類別是公有類別,代表訓練工作於非同步調用時的傳回函式。未來類別實作 concurrent.futures.Future