Memanggil fungsi jarak jauh - Amazon SageMaker

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memanggil fungsi jarak jauh

Untuk memanggil fungsi di dalam dekorator @remote, gunakan salah satu metode berikut:

Jika Anda menggunakan metode dekorator @remote untuk menjalankan fungsi, pekerjaan pelatihan akan menunggu fungsi selesai sebelum memulai tugas baru. Namun, jika Anda menggunakan RemoteExecutorAPI, Anda dapat menjalankan lebih dari satu pekerjaan secara paralel. Bagian berikut menunjukkan kedua cara menjalankan fungsi.

Gunakan dekorator @remote untuk menjalankan fungsi

Anda dapat menggunakan dekorator @remote untuk membubuhi keterangan suatu fungsi. SageMaker akan mengubah kode di dalam dekorator menjadi pekerjaan SageMaker pelatihan. Pekerjaan pelatihan kemudian akan memanggil fungsi di dalam dekorator dan menunggu pekerjaan selesai. Contoh kode berikut menunjukkan cara mengimpor pustaka yang diperlukan, memulai SageMaker instance, dan membubuhi keterangan perkalian matriks dengan dekorator @remote.

from sagemaker.remote_function import remote import numpy as np @remote(instance_type="ml.m5.large") def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) assert (matrix_multiply(a, b) == np.array([1,2])).all()

Dekorator didefinisikan sebagai berikut.

def remote( *, **kwarg): ...

Saat Anda menjalankan fungsi yang didekorasi, SageMaker SDK Python memuat pengecualian apa pun yang dimunculkan oleh kesalahan ke memori lokal. Dalam contoh kode berikut, panggilan pertama ke fungsi membagi selesai dengan sukses dan hasilnya dimuat ke memori lokal. Dalam panggilan kedua ke fungsi membagi, kode mengembalikan kesalahan dan kesalahan ini dimuat ke memori lokal.

from sagemaker.remote_function import remote import pytest @remote() def divide(a, b): return a/b # the underlying job is completed successfully # and the function return is loaded assert divide(10, 5) == 2 # the underlying job fails with "AlgorithmError" # and the function exception is loaded into local memory with pytest.raises(ZeroDivisionError): divide(10, 0)
catatan

Fungsi yang didekorasi dijalankan sebagai pekerjaan jarak jauh. Jika utas terputus, pekerjaan yang mendasarinya tidak akan dihentikan.

Cara mengubah nilai variabel lokal

Fungsi dekorator dijalankan pada mesin jarak jauh. Mengubah variabel non-lokal atau argumen masukan di dalam fungsi yang didekorasi tidak akan mengubah nilai lokal.

Dalam contoh kode berikut, daftar dan dict ditambahkan di dalam fungsi dekorator. Ini tidak berubah ketika fungsi dekorator dipanggil.

a = [] @remote def func(): a.append(1) # when func is invoked, a in the local memory is not modified func() func() # a stays as [] a = {} @remote def func(a): # append new values to the input dictionary a["key-2"] = "value-2" a = {"key": "value"} func(a) # a stays as {"key": "value"}

Untuk mengubah nilai variabel lokal yang dideklarasikan di dalam fungsi dekorator, kembalikan variabel dari fungsi. Contoh kode berikut menunjukkan bahwa nilai variabel lokal berubah ketika dikembalikan dari fungsi.

a = {"key-1": "value-1"} @remote def func(a): a["key-2"] = "value-2" return a a = func(a) -> {"key-1": "value-1", "key-2": "value-2"}

Serialisasi data dan deserialisasi

Saat Anda menjalankan fungsi jarak jauh, SageMaker secara otomatis membuat serial argumen fungsi Anda selama tahap input dan output. Argumen dan pengembalian fungsi diserialisasikan menggunakan cloudpickle. SageMaker mendukung serialisasi objek dan fungsi Python berikut.

  • Objek Python bawaan termasuk dict, list, float, int, string, nilai boolean, dan tupel

  • Array Numpy

  • Dataframes Panda

  • Scikit-learn dataset dan estimator

  • PyTorch model

  • TensorFlow model

  • Kelas Booster untuk XGBoost

Berikut ini dapat digunakan dengan beberapa batasan.

  • Dask DataFrames

  • Kelas XGBoost Dmatrix

  • TensorFlow dataset dan subclass

  • PyTorch model

Bagian berikut berisi praktik terbaik untuk menggunakan kelas Python sebelumnya dengan beberapa batasan dalam fungsi jarak jauh Anda, informasi tentang tempat SageMaker menyimpan data serial Anda dan cara mengelola akses ke sana.

Praktik terbaik untuk kelas Python dengan dukungan terbatas untuk serialisasi data jarak jauh

Anda dapat menggunakan kelas Python yang tercantum di bagian ini dengan batasan. Bagian selanjutnya membahas praktik terbaik untuk cara menggunakan kelas Python berikut.

  • Dask DataFrames

  • XGBoostDMatricKelas

  • TensorFlow dataset dan subclass

  • PyTorch model

Dask adalah perpustakaan sumber terbuka yang digunakan untuk komputasi paralel dengan Python. Bagian ini menunjukkan yang berikut ini.

  • Cara meneruskan Dask DataFrame ke fungsi jarak jauh Anda

  • Cara mengonversi statistik ringkasan dari Dask DataFrame menjadi Panda DataFrame

Cara meneruskan Dask DataFrame ke fungsi jarak jauh Anda

Dask sering DataFrames digunakan untuk memproses dataset besar karena mereka dapat menyimpan dataset yang membutuhkan lebih banyak memori daripada yang tersedia. Ini karena Dask DataFrame tidak memuat data lokal Anda ke dalam memori. Jika Anda meneruskan Dask DataFrame sebagai argumen fungsi ke fungsi jarak jauh Anda, Dask dapat meneruskan referensi ke data di disk lokal atau penyimpanan cloud Anda, bukan data itu sendiri. Kode berikut menunjukkan contoh melewati Dask DataFrame di dalam fungsi remote Anda yang akan beroperasi pada kosong DataFrame.

#Do not pass a Dask DataFrame to your remote function as follows def clean(df: dask.DataFrame ): cleaned = df[] \ ...

Dask akan memuat data dari Dask DataFrame ke dalam memori hanya ketika Anda menggunakan file. DataFrame Jika Anda ingin menggunakan Dask DataFrame di dalam fungsi jarak jauh, berikan jalur ke data. Kemudian Dask akan membaca dataset langsung dari jalur data yang Anda tentukan saat kode berjalan.

Contoh kode berikut menunjukkan cara menggunakan Dask DataFrame di dalam fungsi clean remote. Dalam contoh kode, raw_data_path diteruskan ke clean bukan Dask DataFrame. Saat kode berjalan, kumpulan data dibaca langsung dari lokasi bucket Amazon S3 yang ditentukan. raw_data_path Kemudian persist fungsi menyimpan dataset dalam memori untuk memfasilitasi random_split fungsi selanjutnya dan ditulis kembali ke jalur data output dalam bucket S3 menggunakan fungsi Dask DataFrame API.

import dask.dataframe as dd @remote( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) #pass the data path to your remote function rather than the Dask DataFrame itself def clean(raw_data_path: str, output_data_path: str: split_ratio: list[float]): df = dd.read_parquet(raw_data_path) #pass the path to your DataFrame cleaned = df[(df.column_a >= 1) & (df.column_a < 5)]\ .drop(['column_b', 'column_c'], axis=1)\ .persist() #keep the data in memory to facilitate the following random_split operation train_df, test_df = cleaned.random_split(split_ratio, random_state=10) train_df.to_parquet(os.path.join(output_data_path, 'train') test_df.to_parquet(os.path.join(output_data_path, 'test')) clean("s3://amzn-s3-demo-bucket/raw/", "s3://amzn-s3-demo-bucket/cleaned/", split_ratio=[0.7, 0.3])
Cara mengonversi statistik ringkasan dari Dask DataFrame menjadi Panda DataFrame

Ringkasan statistik dari Dask DataFrame dapat dikonversi menjadi Pandas DataFrame dengan menggunakan compute metode seperti yang ditunjukkan dalam contoh kode berikut. Dalam contoh, bucket S3 berisi Dask besar DataFrame yang tidak dapat masuk ke dalam memori atau ke dalam kerangka data Pandas. Dalam contoh berikut, fungsi jarak jauh memindai kumpulan data dan mengembalikan Dask yang DataFrame berisi statistik keluaran dari describe ke Pandas. DataFrame

executor = RemoteExecutor( instance_type='ml.m5.24xlarge', volume_size=300, keep_alive_period_in_seconds=600) future = executor.submit(lambda: dd.read_parquet("s3://amzn-s3-demo-bucket/raw/").describe().compute()) future.result()

DMatrixadalah struktur data internal yang digunakan oleh XGBoost untuk memuat data. Sebuah DMatrix objek tidak dapat diasinkan untuk bergerak dengan mudah di antara sesi komputasi. DMatrixInstans yang lewat secara langsung akan gagal dengan a. SerializationError

Cara meneruskan objek data ke fungsi jarak jauh Anda dan berlatih dengan XGBoost

Untuk mengonversi Pandas DataFrame menjadi DMatrix instance dan menggunakannya untuk pelatihan dalam fungsi jarak jauh Anda, teruskan langsung ke fungsi jarak jauh seperti yang ditunjukkan pada contoh kode berikut.

import xgboost as xgb @remote def train(df, params): #Convert a pandas dataframe into a DMatrix DataFrame and use it for training dtrain = DMatrix(df) return xgb.train(dtrain, params)

TensorFlow dataset dan subclass adalah objek internal yang digunakan oleh TensorFlow untuk memuat data selama pelatihan. TensorFlow kumpulan data dan subkelas tidak dapat diasinkan untuk berpindah dengan mudah di antara sesi komputasi. Melewati kumpulan data atau subkelas Tensorflow secara langsung akan gagal dengan file. SerializationError Gunakan Tensorflow I/O APIs untuk memuat data dari penyimpanan, seperti yang ditunjukkan pada contoh kode berikut.

import tensorflow as tf import tensorflow_io as tfio @remote def train(data_path: str, params): dataset = tf.data.TextLineDataset(tf.data.Dataset.list_files(f"{data_path}/*.txt")) ... train("s3://amzn-s3-demo-bucket/data", {})

PyTorch model dapat diserialkan dan dapat diteruskan antara lingkungan lokal Anda dan fungsi jarak jauh. Jika lingkungan lokal dan lingkungan jarak jauh Anda memiliki jenis perangkat yang berbeda, seperti (GPUsdanCPUs), Anda tidak dapat mengembalikan model terlatih ke lingkungan lokal Anda. Misalnya, jika kode berikut dikembangkan di lingkungan lokal tanpa GPUs tetapi dijalankan dalam sebuah instance denganGPUs, mengembalikan model terlatih secara langsung akan mengarah ke fileDeserializationError.

# Do not return a model trained on GPUs to a CPU-only environment as follows @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") # a device without GPU capabilities model = Net().to(device) # train the model ... return model model = train(...) #returns a DeserializationError if run on a device with GPU

Untuk mengembalikan model yang dilatih dalam GPU lingkungan ke model yang hanya berisi CPU kemampuan, gunakan PyTorch model I/O APIs secara langsung seperti yang ditunjukkan pada contoh kode di bawah ini.

import s3fs model_path = "s3://amzn-s3-demo-bucket/folder/" @remote(instance_type='ml.g4dn.xlarge') def train(...): if torch.cuda.is_available(): device = torch.device("cuda") else: device = torch.device("cpu") model = Net().to(device) # train the model ... fs = s3fs.FileSystem() with fs.open(os.path.join(model_path, 'model.pt'), 'wb') as file: torch.save(model.state_dict(), file) #this writes the model in a device-agnostic way (CPU vs GPU) train(...) #use the model to train on either CPUs or GPUs model = Net() fs = s3fs.FileSystem()with fs.open(os.path.join(model_path, 'model.pt'), 'rb') as file: model.load_state_dict(torch.load(file, map_location=torch.device('cpu')))

Tempat SageMaker menyimpan data serial Anda

Saat Anda memanggil fungsi jarak jauh, SageMaker secara otomatis membuat serial argumen fungsi Anda dan mengembalikan nilai selama tahap input dan output. Data serial ini disimpan di bawah direktori root di bucket S3 Anda. Anda menentukan direktori root,<s3_root_uri>, dalam file konfigurasi. Parameter job_name dibuat secara otomatis untuk Anda.

Di bawah direktori root, SageMaker buat <job_name> folder, yang menyimpan direktori kerja Anda saat ini, fungsi serial, argumen untuk fungsi serial Anda, hasil, dan pengecualian apa pun yang muncul dari menjalankan fungsi serial.

Di bawah<job_name>, direktori workdir berisi arsip zip dari direktori kerja Anda saat ini. Arsip zip mencakup file Python apa pun di direktori kerja Anda dan requirements.txt file, yang menentukan dependensi apa pun yang diperlukan untuk menjalankan fungsi jarak jauh Anda.

Berikut ini adalah contoh struktur folder di bawah bucket S3 yang Anda tentukan dalam file konfigurasi Anda.

<s3_root_uri>/ # specified by s3_root_uri or S3RootUri <job_name>/ #automatically generated for you workdir/workspace.zip # archive of the current working directory (workdir) function/ # serialized function arguments/ # serialized function arguments results/ # returned output from the serialized function including the model exception/ # any exceptions from invoking the serialized function

Direktori root yang Anda tentukan di bucket S3 Anda tidak dimaksudkan untuk penyimpanan jangka panjang. Data serial terkait erat dengan versi Python dan versi kerangka pembelajaran mesin (ML) yang digunakan selama serialisasi. Jika Anda meng-upgrade versi Python atau kerangka kerja ML, Anda mungkin tidak dapat menggunakan data serial Anda. Sebaliknya, lakukan hal berikut.

  • Simpan artefak model dan model Anda dalam format yang agnostik ke versi Python dan kerangka kerja MLmu.

  • Jika Anda memutakhirkan kerangka kerja Python atau HTML Anda, akses hasil model Anda dari penyimpanan jangka panjang Anda.

penting

Untuk menghapus data serial Anda setelah jangka waktu tertentu, tetapkan konfigurasi seumur hidup pada bucket S3 Anda.

catatan

File yang diserialkan dengan modul acar Python bisa kurang portabel dibandingkan format data lainnya CSV termasuk, Parket dan. JSON Berhati-hatilah saat memuat file acar dari sumber yang tidak dikenal.

Untuk informasi selengkapnya tentang apa yang harus disertakan dalam file konfigurasi untuk fungsi jarak jauh, lihat File Konfigurasi.

Akses ke data serial Anda

Administrator dapat menyediakan pengaturan untuk data serial Anda, termasuk lokasinya dan pengaturan enkripsi apa pun dalam file konfigurasi. Secara default, data serial dienkripsi dengan AWS Key Management Service ()AWS KMS Key. Administrator juga dapat membatasi akses ke direktori root yang Anda tentukan dalam file konfigurasi dengan kebijakan bucket. File konfigurasi dapat dibagikan dan digunakan di seluruh proyek dan pekerjaan. Untuk informasi selengkapnya, lihat File Konfigurasi.

Gunakan RemoteExecutor API untuk memanggil fungsi

Anda dapat menggunakan RemoteExecutor API untuk memanggil fungsi. SageMaker Python SDK akan mengubah kode di dalam RemoteExecutor panggilan menjadi pekerjaan SageMaker pelatihan. Pekerjaan pelatihan kemudian akan memanggil fungsi sebagai operasi asinkron dan mengembalikan masa depan. Jika Anda menggunakan RemoteExecutorAPI, Anda dapat menjalankan lebih dari satu pekerjaan pelatihan secara paralel. Untuk informasi lebih lanjut tentang futures di Python, lihat Futures.

Contoh kode berikut menunjukkan cara mengimpor pustaka yang diperlukan, mendefinisikan fungsi, memulai SageMaker instance, dan menggunakan API untuk mengirimkan permintaan untuk menjalankan 2 pekerjaan secara paralel.

from sagemaker.remote_function import RemoteExecutor def matrix_multiply(a, b): return np.matmul(a, b) a = np.array([[1, 0], [0, 1]]) b = np.array([1, 2]) with RemoteExecutor(max_parallel_job=2, instance_type="ml.m5.large") as e: future = e.submit(matrix_multiply, a, b) assert (future.result() == np.array([1,2])).all()

RemoteExecutorKelas adalah implementasi dari pustaka Concurrent.futures.Executor.

Contoh kode berikut menunjukkan bagaimana mendefinisikan fungsi dan memanggilnya menggunakanRemoteExecutorAPI. Dalam contoh ini, RemoteExecutor akan mengirimkan 4 pekerjaan secara total, tetapi hanya secara 2 paralel. Dua pekerjaan terakhir akan menggunakan kembali cluster dengan overhead minimal.

from sagemaker.remote_function.client import RemoteExecutor def divide(a, b): return a/b with RemoteExecutor(max_parallel_job=2, keep_alive_period_in_seconds=60) as e: futures = [e.submit(divide, a, 2) for a in [3, 5, 7, 9]] for future in futures: print(future.result())

max_parallel_jobParameter hanya berfungsi sebagai mekanisme pembatas laju tanpa mengoptimalkan alokasi sumber daya komputasi. Dalam contoh kode sebelumnya, RemoteExecutor tidak mencadangkan sumber daya komputasi untuk dua pekerjaan paralel sebelum pekerjaan apa pun dikirimkan. Untuk informasi selengkapnya tentang max_parallel_job atau parameter lain untuk dekorator @remote, lihat Kelas fungsi jarak jauh dan spesifikasi metode.

Kelas masa depan untuk RemoteExecutor API

Kelas future adalah kelas publik yang mewakili fungsi pengembalian dari pekerjaan pelatihan ketika dipanggil secara asinkron. Kelas future mengimplementasikan class concurrent.futures.future. Kelas ini dapat digunakan untuk melakukan operasi pada pekerjaan yang mendasarinya dan memuat data ke dalam memori.