使用 TorchServe 部署大型模型以進行推論 - Amazon SageMaker AI

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 TorchServe 部署大型模型以進行推論

本教學課程示範如何在 GPUs 上使用 TorchServe 在 Amazon SageMaker AI 中部署大型模型並提供推論。此範例將 Opt-30b 模型部署至 ml.g5 執行個體。您可以修改此設定以便使用其他模型和執行個體類型。以您自己的資訊取代此範例中的 italicized placeholder text

TorchServe 是功能強大的開放式平台,用於大型分散式模型推論。透過支援 PyTorch、原生 PiPPy、DeepSpeed 和 HuggingFace Accelerate 等熱門程式庫,提供統一的處理常式 API,這些 API 在分散式大型模型和非分散式模型推論情況中保持一致。如需更多資訊,請參閱 TorchServe 的大型模型推論文件

使用 TorchServe 的深度學習容器

若要在 SageMaker AI 上使用 TorchServe 部署大型模型,您可以使用其中一個 SageMaker AI 深度學習容器 DLCs)。根據預設,TorchServe 會安裝在所有 AWS PyTorch DLCs中。在模型載入期間,TorchServe 可以安裝專為 PiPPy、Deepspeed 和 Accelerate 等大型模型量身打造的特殊程式庫。

下表列出使用 TorchServe 的所有 SageMaker AI DLCs

DLC 類別 架構 硬體 範例 URL

SageMaker AI Framework Containers

PyTorch 2.0.0+

CPU、GPU

763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference:2.0.1-gpu-py310-cu118-ubuntu20.04-sagemaker

SageMaker AI Framework Graviton 容器

PyTorch 2.0.0+

CPU

763104351884.dkr.ecr.us-east-1.amazonaws.com/pytorch-inference-graviton:2.0.1-cpu-py310-ubuntu20.04-sagemaker

StabilityAI 推論容器

PyTorch 2.0.0+

GPU

763104351884.dkr.ecr.us-east-1.amazonaws.com/stabilityai-pytorch-inference:2.0.1-sgm0.1.0-gpu-py310-cu118-ubuntu20.04-sagemaker

神經元容器

PyTorch 1.13.1

Neuronx

763104351884.dkr.ecr.us-west-2.amazonaws.com/pytorch-inference-neuron:1.13.1-neuron-py310-sdk2.12.0-ubuntu20.04

開始使用

部署模型之前,請先完成先決條件。您還可以設定模型參數並自訂處理常式程式碼。

必要條件

若要開始使用,請務必確認您已具備下列先決條件:

  1. 確保您可存取 AWS 帳戶。設定您的環境,讓 AWS CLI 可以透過 IAM AWS 使用者或 IAM 角色存取您的帳戶。我們建議使用 IAM 角色。為了在個人帳戶進行測試,您可以將以下受管權限政策附加到 IAM 角色:

    如需更多將 IAM 政策連接至角色的相關資訊,請參閱 AWS IAM 使用者指南中的新增和移除 IAM 身分許可

  2. 在本機設定相依性,如以下範例所示。

    1. 安裝 第 2 版 AWS CLI:

      # Install the latest AWS CLI v2 if it is not installed !curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip" !unzip awscliv2.zip #Follow the instructions to install v2 on the terminal !cat aws/README.md
    2. 安裝 SageMaker AI 和 Boto3 用戶端:

      # If already installed, update your client #%pip install sagemaker pip --upgrade --quiet !pip install -U sagemaker !pip install -U boto !pip install -U botocore !pip install -U boto3

設定模型設定和參數

TorchServe 使用 torchrun 設定分散式環境以進行模型平行處理。TorchServe 能夠為大型模型支援多個工作者。在預設值中,TorchServe 使用循環配置資源演算法將 GPU 指派給主機上的工作者。在大型模型推論的情況下,會根據 model_config.yaml 檔案中指定的 GPU 數量,自動計算指派給每個工作者的 GPU 數量。環境變數 CUDA_VISIBLE_DEVICES (指定 GPU 裝置 ID 可於特定時間可見的環境變數) 根據此數量設定。

例如,假設節點上有 8 個 GPUs,而一個工作者在節點上需要 4 個 GPUs (nproc_per_node=4)。在此情況下,TorchServe 會將四個 GPU 指派給第一個工作者 (CUDA_VISIBLE_DEVICES="0,1,2,3"),並將四個 GPU 指派給第二個工作者 (CUDA_VISIBLE_DEVICES="4,5,6,7”)。

除了此預設行為之外,TorchServe 還為使用者提供為工作者指定 GPU 的彈性。例如,如果您在模型組態 YAML 檔案deviceIds: [2,3,4,5]中設定變數,然後設定 nproc_per_node=2,則 TorchServe 會指派 CUDA_VISIBLE_DEVICES=”2,3” 給第一個工作者並指派 CUDA_VISIBLE_DEVICES="4,5” 給第二個工作者。

在下列 model_config.yaml 範例中,我們為 OPT-30b 模型設定前端和後端參數。設定的前端參數為 parallelTypedeviceTypedeviceIds torchrun。如需有關您可以設定的前端參數的詳細資訊,請參閱 PyTorch GitHub 文件。後端設定是以允許自由樣式自訂的 YAML 對應為基礎。對於後端參數,我們定義 DeepSpeed 組態和自訂處理常式程式碼使用的其他參數。

# TorchServe front-end parameters minWorkers: 1 maxWorkers: 1 maxBatchDelay: 100 responseTimeout: 1200 parallelType: "tp" deviceType: "gpu" # example of user specified GPU deviceIds deviceIds: [0,1,2,3] # sets CUDA_VISIBLE_DEVICES torchrun: nproc-per-node: 4 # TorchServe back-end parameters deepspeed: config: ds-config.json checkpoint: checkpoints.json handler: # parameters for custom handler code model_name: "facebook/opt-30b" model_path: "model/models--facebook--opt-30b/snapshots/ceea0a90ac0f6fae7c2c34bcb40477438c152546" max_length: 50 max_new_tokens: 10 manual_seed: 40

自訂處理常式

TorchServe 為使用熱門程式庫建置的大型模型推論提供基本處理常式處理常式公用程式。下面的範例示範自訂處理常式類別 TransformersSeqClassifierHandler 如何延伸 BaseDeepSpeedHandler 和使用處理常式公用程式。如需完整的程式碼範例,請參閱 PyTorch GitHub 文件中的 custom_handler.py 程式碼

class TransformersSeqClassifierHandler(BaseDeepSpeedHandler, ABC): """ Transformers handler class for sequence, token classification and question answering. """ def __init__(self): super(TransformersSeqClassifierHandler, self).__init__() self.max_length = None self.max_new_tokens = None self.tokenizer = None self.initialized = False def initialize(self, ctx: Context): """In this initialize function, the HF large model is loaded and partitioned using DeepSpeed. Args: ctx (context): It is a JSON Object containing information pertaining to the model artifacts parameters. """ super().initialize(ctx) model_dir = ctx.system_properties.get("model_dir") self.max_length = int(ctx.model_yaml_config["handler"]["max_length"]) self.max_new_tokens = int(ctx.model_yaml_config["handler"]["max_new_tokens"]) model_name = ctx.model_yaml_config["handler"]["model_name"] model_path = ctx.model_yaml_config["handler"]["model_path"] seed = int(ctx.model_yaml_config["handler"]["manual_seed"]) torch.manual_seed(seed) logger.info("Model %s loading tokenizer", ctx.model_name) self.tokenizer = AutoTokenizer.from_pretrained(model_name) self.tokenizer.pad_token = self.tokenizer.eos_token config = AutoConfig.from_pretrained(model_name) with torch.device("meta"): self.model = AutoModelForCausalLM.from_config( config, torch_dtype=torch.float16 ) self.model = self.model.eval() ds_engine = get_ds_engine(self.model, ctx) self.model = ds_engine.module logger.info("Model %s loaded successfully", ctx.model_name) self.initialized = True def preprocess(self, requests): """ Basic text preprocessing, based on the user's choice of application mode. Args: requests (list): A list of dictionaries with a "data" or "body" field, each containing the input text to be processed. Returns: tuple: A tuple with two tensors: the batch of input ids and the batch of attention masks. """ def inference(self, input_batch): """ Predicts the class (or classes) of the received text using the serialized transformers checkpoint. Args: input_batch (tuple): A tuple with two tensors: the batch of input ids and the batch of attention masks, as returned by the preprocess function. Returns: list: A list of strings with the predicted values for each input text in the batch. """ def postprocess(self, inference_output): """Post Process Function converts the predicted response into Torchserve readable format. Args: inference_output (list): It contains the predicted response of the input text. Returns: (list): Returns a list of the Predictions and Explanations. """

準備您的模型成品

在 SageMaker AI 上部署模型之前,您必須封裝模型成品。對於大型模型,我們建議您使用 PyTorch torch-model-archiver 工具搭配引數 --archive-format no-archive,這會略過壓縮模型成品。下列範例會將所有模型成品儲存到名為 opt/ 的新資料夾中。

torch-model-archiver --model-name opt --version 1.0 --handler custom_handler.py --extra-files ds-config.json -r requirements.txt --config-file opt/model-config.yaml --archive-format no-archive

建立 opt/ 資料夾後,請使用 PyTorch Download_model 工具將 OPT-30b 模型下載到資料夾中。

cd opt python path_to/Download_model.py --model_path model --model_name facebook/opt-30b --revision main

最後,將模型成品上傳至 Amazon S3 儲存貯體。

aws s3 cp opt {your_s3_bucket}/opt --recursive

您現在應該將模型成品存放在 Amazon S3 中,準備好部署到 SageMaker AI 端點。

使用 SageMaker Python SDK 部署模型

準備模型成品後,您可以將模型部署到 SageMaker AI 託管端點。本節說明如何將單一大型模型部署到端點,並進行串流回應預測。如需更多端點串流回應的相關資訊,請參閱調用即時端點

若要部署模型,請完成下列步驟:

  1. 建立 SageMaker AI 工作階段,如下列範例所示。

    import boto3 import sagemaker from sagemaker import Model, image_uris, serializers, deserializers boto3_session=boto3.session.Session(region_name="us-west-2") smr = boto3.client('sagemaker-runtime-demo') sm = boto3.client('sagemaker') role = sagemaker.get_execution_role() # execution role for the endpoint sess= sagemaker.session.Session(boto3_session, sagemaker_client=sm, sagemaker_runtime_client=smr) # SageMaker AI session for interacting with different AWS APIs region = sess._region_name # region name of the current SageMaker Studio Classic environment account = sess.account_id() # account_id of the current SageMaker Studio Classic environment # Configuration: bucket_name = sess.default_bucket() prefix = "torchserve" output_path = f"s3://{bucket_name}/{prefix}" print(f'account={account}, region={region}, role={role}, output_path={output_path}')
  2. 在 SageMaker AI 中建立未壓縮模型,如下列範例所示。

    from datetime import datetime instance_type = "ml.g5.24xlarge" endpoint_name = sagemaker.utils.name_from_base("ts-opt-30b") s3_uri = {your_s3_bucket}/opt model = Model( name="torchserve-opt-30b" + datetime.now().strftime("%Y-%m-%d-%H-%M-%S"), # Enable SageMaker uncompressed model artifacts model_data={ "S3DataSource": { "S3Uri": s3_uri, "S3DataType": "S3Prefix", "CompressionType": "None", } }, image_uri=container, role=role, sagemaker_session=sess, env={"TS_INSTALL_PY_DEP_PER_MODEL": "true"}, ) print(model)
  3. 將模型部署至 Amazon EC2 執行個體,如以下範例所示。

    model.deploy( initial_instance_count=1, instance_type=instance_type, endpoint_name=endpoint_name, volume_size=512, # increase the size to store large model model_data_download_timeout=3600, # increase the timeout to download large model container_startup_health_check_timeout=600, # increase the timeout to load large model )
  4. 初始化類別以處理串流回應,如下列範例所示。

    import io class Parser: """ A helper class for parsing the byte stream input. The output of the model will be in the following format: ``` b'{"outputs": [" a"]}\n' b'{"outputs": [" challenging"]}\n' b'{"outputs": [" problem"]}\n' ... ``` While usually each PayloadPart event from the event stream will contain a byte array with a full json, this is not guaranteed and some of the json objects may be split across PayloadPart events. For example: ``` {'PayloadPart': {'Bytes': b'{"outputs": '}} {'PayloadPart': {'Bytes': b'[" problem"]}\n'}} ``` This class accounts for this by concatenating bytes written via the 'write' function and then exposing a method which will return lines (ending with a '\n' character) within the buffer via the 'scan_lines' function. It maintains the position of the last read position to ensure that previous bytes are not exposed again. """ def __init__(self): self.buff = io.BytesIO() self.read_pos = 0 def write(self, content): self.buff.seek(0, io.SEEK_END) self.buff.write(content) data = self.buff.getvalue() def scan_lines(self): self.buff.seek(self.read_pos) for line in self.buff.readlines(): if line[-1] != b'\n': self.read_pos += len(line) yield line[:-1] def reset(self): self.read_pos = 0
  5. 測試串流回應預測,如以下範例所示。

    import json body = "Today the weather is really nice and I am planning on".encode('utf-8') resp = smr.invoke_endpoint_with_response_stream(EndpointName=endpoint_name, Body=body, ContentType="application/json") event_stream = resp['Body'] parser = Parser() for event in event_stream: parser.write(event['PayloadPart']['Bytes']) for line in parser.scan_lines(): print(line.decode("utf-8"), end=' ')

您現在已將模型部署到 SageMaker AI 端點,並應該能夠叫用它來回應。如需 SageMaker AI 即時端點的詳細資訊,請參閱單一模型端點