リモート関数を呼び出す - Amazon SageMaker

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

リモート関数を呼び出す

@remote デコレータ内の関数を呼び出すには、次のいずれかの方法を使用します。

@remote デコレータメソッドを使用して関数を呼び出す場合、トレーニングジョブでは、関数が完了するのを待ってから新しいタスクを開始します。ただし、 を使用する場合はRemoteExecutorAPI、複数のジョブを並行して実行できます。以下のセクションでは、関数を呼び出す両方の方法を示します。

@remote デコレータを使用して関数を呼び出す

@remote デコレータを使用して関数に注釈を付けることができます。 SageMaker はデコレータ内のコードを SageMaker トレーニングジョブに変換します。その後、トレーニングジョブでは、デコレータ内部の関数を呼び出し、ジョブが完了するのを待ちます。次のコード例は、必要なライブラリをインポートし、インスタンスを起動 SageMakerし、@remote デコレータで行列乗算に注釈を付ける方法を示しています。

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

デコレータは次のように定義されます。

def remote( *, **kwarg): ...

デコレーションされた関数を呼び出すと、 SageMaker Python はエラーによって発生した例外をローカルメモリにSDKロードします。次のコードサンプルでは、最初の divide 関数の呼び出しが正常に完了し、結果がローカルメモリに読み込まれます。2 回目の divide 関数の呼び出しで、コードからエラーが返され、このエラーがローカルメモリに読み込まれます。

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
注記

修飾された関数はリモートジョブとして実行されます。スレッドが中断されても、基本となるジョブは停止しません。

ローカル変数の値を変更する方法

デコレータ関数はリモートマシン上で実行されます。修飾された関数内部の非ローカル変数や入力引数を変更しても、ローカル値は変更されません。

次のコードサンプルでは、デコレータ関数内部にリスト (list) と 辞書 (dict) が追加されています。これはデコレータ関数が呼び出されても変更されません。

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

デコレータ関数内で宣言されたローカル変数の値を変更するには、関数から変数を返します。次のコードサンプルでは、ローカル変数の値が関数から返されるときに変更されることを示しています。

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

データのシリアル化と逆シリアル化

リモート関数を呼び出すと、 は入力および出力ステージ中に関数引数 SageMaker を自動的にシリアル化します。関数の引数と戻り値は、cloudpickle を使用してシリアル化されます。 は、次の Python オブジェクトと関数のシリアル化 SageMaker をサポートしています。

  • 辞書、リスト、浮動小数点数、整数、文字列、ブール値、タプルを含む組み込みの Python オブジェクト

  • Numpy 配列

  • Pandas DataFrame

  • Scikit-learn のデータセットと推定器

  • PyTorch モデル

  • TensorFlow モデル

  • のブースタークラス XGBoost

以下を使用する場合は、いくつか制限があります。

  • Dask DataFrames

  • XGBoost Dmatrix クラス

  • TensorFlow データセットとサブクラス

  • PyTorch モデル

次のセクションでは、以前の Python クラスを使用するためのベストプラクティスと、 がシリアル化されたデータ SageMaker を保存する場所に関する情報、およびアクセスを管理する方法を示します。

リモートデータシリアル化のサポートが制限されている Python クラスのベストプラクティス

このセクションに記載されている Python クラスは制限付きで使用できます。次のセクションでは、次の Python クラスの使用方法に関するベストプラクティスについて説明します。

  • Dask DataFrames

  • XGBoost DMatric クラス

  • TensorFlow データセットとサブクラス

  • PyTorch モデル

Dask は、Python のパラレルコンピューティングに使用されるオープンソースのライブラリです。このセクションでは、次の内容を説明します。

  • リモート関数 DataFrame に Dask を渡す方法

  • Dask DataFrame から Pandas にサマリー統計を変換する方法 DataFrame

リモート関数 DataFrame に Dask を渡す方法

Dask DataFrames は、使用可能な量よりも多くのメモリを必要とするデータセットを保持できるため、大規模なデータセットの処理によく使用されます。これは、Dask DataFrame がローカルデータをメモリにロードしないためです。リモート関数に関数引数として Dask DataFrame を渡すと、Dask はデータ自体ではなく、ローカルディスクまたはクラウドストレージ内のデータへの参照を渡すことがあります。次のコードは、空の で動作するリモート関数 DataFrame 内に Dask を渡す例を示しています DataFrame。

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

Dask は、 を使用する場合のみ、Dask からデータをメモリ DataFrame にロードします DataFrame 。リモート関数 DataFrame 内で Dask を使用する場合は、データ へのパスを指定します。そうすると、Dask はコードの実行時に指定したデータパスから直接データセットを読み取ります。

次のコード例は、リモート関数 内で Dask DataFrame を使用する方法を示していますclean。コード例では、 raw_data_pathは Dask の代わりにクリーンに渡されます DataFrame。コードを実行すると、raw_data_path で指定された Amazon S3 バケットの場所からデータセットが直接読み取られます。次に、persist関数はデータセットをメモリに保持して後続のrandom_split関数を容易にし、Dask DataFrame API 関数を使用して S3 バケットの出力データパスに書き戻します。

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
Dask DataFrame から Pandas にサマリー統計を変換する方法 DataFrame

Dask のサマリー統計は、次のサンプルコードに示すように、 computeメソッドを呼び出し DataFrame て Pandas に変換 DataFrame できます。この例では、S3 バケットには、メモリや Pandas データフレームに収ま DataFrame らない大きな Dask が含まれています。次の例では、リモート関数がデータセットをスキャンし、 から Pandas describeに出力統計を含む Dask DataFrame を返します DataFrame。

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrix は、 がデータをロードXGBoostするために使用する内部データ構造です。コンピューティングセッション間で簡単に移動するために、DMatrixオブジェクトをピックすることはできません。DMatrix インスタンスを直接渡すと、 で失敗しますSerializationError

データオブジェクトをリモート関数に渡し、 でトレーニングする方法 XGBoost

Pandas DataFrame をDMatrixインスタンスに変換し、リモート関数のトレーニングに使用するには、次のコード例に示すように、リモート関数に直接渡します。

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow データセットとサブクラスは、トレーニング中にデータをロード TensorFlow するために が使用する内部オブジェクトです。コンピューティングセッション間で簡単に移動するために TensorFlow データセットとサブクラスをピックすることはできません。Tensorflow のデータセットまたはサブクラスを直接渡すと、SerializationError のために失敗します。次のコード例に示すように、Tensorflow I/O APIsを使用してストレージからデータをロードします。

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch モデルはシリアル化可能で、ローカル環境とリモート関数の間で渡すことができます。ローカル環境とリモート環境のデバイスタイプが (GPUs や などCPUs) と異なる場合、トレーニングされたモデルをローカル環境に返すことはできません。例えば、次のコードが を使用していないローカル環境で開発GPUsされ、 を使用してインスタンスで実行される場合GPUs、トレーニングされたモデルを直接返すと になりますDeserializationError

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

GPU 環境でトレーニングされたモデルをCPU機能のみを含むモデルに戻すには、以下のコード例に示すように、 PyTorch モデル I/O APIsを直接使用します。

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

がシリアル化されたデータ SageMaker を保存する

リモート関数を呼び出すと、 は関数引数 SageMaker を自動的にシリアル化し、入力および出力ステージ中に値を返します。このシリアル化されたデータは、S3 バケットのルートディレクトリに保存されます。設定ファイルでルートディレクトリ <s3_root_uri> を指定します。パラメータ job_name は自動的に生成されます。

ルートディレクトリの下に、現在の作業ディレクトリ、シリアル化された関数、シリアル化された関数の引数、結果、およびシリアル化された関数の呼び出しに起因する例外を保持する<job_name>フォルダ SageMaker を作成します。

<job_name> 下のディレクトリ workdir には、現在の作業ディレクトリの zip アーカイブのアーカイブが含まれています。zip アーカイブには、作業ディレクトリ内のすべての Python ファイルと、リモート関数を実行するのに必要な依存関係を指定する requirements.txt ファイルが含まれます。

以下は、設定ファイルで指定する S3 バケットのフォルダ構造の例です。

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

S3 バケットで指定するルートディレクトリは、長期保存用ではありません。シリアル化されたデータは、シリアル化中に使用された Python バージョンと機械学習 (ML) フレームワークのバージョンと密接に関連付けられます。Python バージョンまたは機械学習フレームワークをアップグレードすると、シリアル化されたデータを使用できなくなる場合があります。代わりに、以下の手順を実行します。

  • モデルとモデルアーティファクトを、ご使用の Python バージョンおよび機械学習フレームワークに依存しない形式で保存します。

  • Python または機械学習フレームワークをアップグレードする場合、長期ストレージからモデル結果にアクセスします。

重要

指定された時間の経過後にシリアル化されたデータを削除するには、S3 バケットに有効期限設定を行います。

注記

Python pickle モジュールでシリアル化されたファイルは、、ParquetCSV、 などの他のデータ形式よりも移植性が低くなる可能性がありますJSON。ソースが不明なピクル化ファイルをロードする場合は注意してください。

リモート関数の設定ファイルに含める内容の詳細については、「設定ファイル」を参照してください。

シリアル化されたデータへのアクセス

管理者は、シリアル化されたデータの場所や構成ファイル内の暗号化設定など、シリアル化されたデータの設定を行うことができます。デフォルトでは、シリアル化されたデータは AWS Key Management Service (AWS KMS) キーで暗号化されます。管理者は、設定ファイルで指定したルートディレクトリへのアクセスをバケットポリシーで制限することもできます。設定ファイルはプロジェクトおよびジョブ間で共有して使用できます。詳細については、「 設定ファイル」を参照してください。

RemoteExecutor API を使用して関数を呼び出す

を使用して関数をRemoteExecutorAPI呼び出すことができます。 SageMaker Python SDKはRemoteExecutor呼び出し SageMaker内のコードをトレーニングジョブに変換します。その後、トレーニングジョブは非同期オペレーションとして関数を呼び出し、Future を返します。を使用する場合はRemoteExecutorAPI、複数のトレーニングジョブを並行して実行できます。Python の Future の詳細については、「Future」を参照してください。

次のコード例は、必要なライブラリをインポートし、関数を定義し、 SageMaker インスタンスを起動し、 API を使用して2ジョブを並行して実行するリクエストを送信する方法を示しています。

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

RemoteExecutor クラスは concurrent.futures.Executor ライブラリの実装です。

次のコード例は、RemoteExecutorAPI を使用して関数を定義し、それを呼び出す方法を示しています。この例では、RemoteExecutor は合計では 4 ジョブを送信しますが、並列では 2 のみ送信します。最後の 2 つのジョブは、最小限のオーバーヘッドでクラスターを再利用します。

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

max_parallel_job パラメータはレート制限メカニズムとしてのみ機能し、コンピューティングリソースの割り当ては最適化されません。前のコード例では、RemoteExecutor では、ジョブが送信される前に 2 つの並列ジョブのコンピューティングリソースを予約しません。max_parallel_job または @remote デコレータのその他のパラメータの詳細については、「Remote function classes and methods specification」を参照してください。

の将来のクラス RemoteExecutor API

future クラスは、非同期で呼び出されたときのトレーニングジョブからの return 関数を表す public クラスです。future クラスは concurrent.futures.Future クラスを実装しています。このクラスを使用すると、基になるジョブを操作したり、データをメモリに読み込んだりできます。