定義模型建置管線 - Amazon SageMaker

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

定義模型建置管線

若要使用 Amazon SageMaker 模型建置管道協調工作流程,您需要以 JSON 管道定義的形式產生有向無環圖 (DAG)。 下列影像是您在此教學課程中建立的管線 DAG 的表示法:

管線定向無環圖 (DAG) 範例。

您可以使用 SageMaker Python SDK 產生您的 JSON 管線定義。 以下教學課程說明如何產生管線定義,以解決迴歸問題,以根據鮑魚的實際測量值判斷鮑魚的年齡。如需包含本教學課程中可執行之內容的 Jupyter 筆記本,請參閱使用 Amazon SageMaker 模型建置管道協調任務

必要條件

若要執行下列教學課程,您必須執行以下動作:

  • 依照建立筆記本執行個體中所述的內容,設定筆記本執行個體。這為您的角色提供讀取和寫入 Amazon S3 的權限,以及在中建立訓練、批次轉換和處理任務 SageMaker。

  • 修改角色許可政策所示,授予您的筆記本取得及傳遞其角色的權限。新增下列 JSON 程式碼片段,以將此政策附加到您的角色。以用來建立筆記本執行個體的 ARN 取代 <your-role-arn>

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • 遵循修改角色信任原則中的步驟,信任 SageMaker 服務主體。將下列陳述式片段新增至角色的信任關係:

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

設定您的環境

使用下面的代碼塊創建一個新的 SageMaker 會話。這將返回作業階段的角色 ARN。此角色 ARN 應為您設定為先決條件的執行角色 ARN。

import boto3 import sagemaker import sagemaker.session from sagemaker.workflow.pipeline_context import PipelineSession region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() pipeline_session = PipelineSession() model_package_group_name = f"AbaloneModelPackageGroupName"

建立管道

重要

允許 Amazon SageMaker 工作室或 Amazon 工作 SageMaker 室經典版創建 Amazon SageMaker 資源的自定義 IAM 政策還必須授予許可才能向這些資源添加標籤。需要向資源添加標籤的權限,因為 Studio 和 Studio 經典版會自動標記它們創建的任何資源。如果 IAM 政策允許 Studio 和 Studio 經典版建立資源,但不允許標記,則在嘗試建立資源時可能會發生 AccessDenied "" 錯誤。如需詳細資訊,請參閱 提供標記資 SageMaker源的權限

AWS Amazon 的受管政策 SageMaker授予建立 SageMaker 資源的權限,已包含在建立這些資源時新增標籤的權限。

從 SageMaker 筆記本執行個體執行下列步驟,以建立管道,包括預先處理、訓練、評估、條件式評估和模型註冊的步驟。

第 1 步:下載資料集

此筆記本使用 UCI Machine Learning 鮑魚資料集。此資料集含下列功能:

  • length – 測量的最長鮑魚外殼尺寸。

  • diameter – 與長度垂直的鮑魚直徑。

  • height – 鮑魚殼中鮑魚肉的高度。

  • whole_weight – 整個鮑魚的重量。

  • shucked_weight – 從鮑魚中取出的肉的重量。

  • viscera_weight – 鮑魚內臟經過出血處理後的重量。

  • shell_weight – 除肉和乾燥後鮑魚殼的重量。

  • sex – 鮑魚的性別。'M'、'F' 或 'I' 之一,其中 'I' 表示幼鮑。

  • rings – 鮑魚殼上的環數。

使用公式 age=rings + 1.5,可以根據鮑魚殼上的環數得到鮑魚年齡的良好近似值。然而,取得此數字的任務非常耗時。您必須沿著螺錐切割貝殼,染色切片,然後在顯微鏡下計算環數。但是,其他的物理測量值比較容易確定。這個筆記本使用資料集,透過其他物理量測建立一個預測環數的模型。

下載資料集
  1. 將資料集下載到帳戶的預設 Amazon S3 儲存貯體中。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. 建立模型後,下載第二個資料集以進行批次轉換。

    local_path = "data/abalone-dataset-batch.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

第 2 步:定義管道參數

此程式碼區塊會為您的管道定義下列參數:

  • processing_instance_count – 處理任務的執行個體計數。

  • input_data – 輸入資料的 Amazon S3 位置。

  • batch_data – 用於批次轉換之輸入資料的 Amazon S3 位置。

  • model_approval_status – 將已訓練模型進行 CI/CD 註冊的核准狀態。如需更多資訊,請參閱使用專案自動執行 MLOP SageMaker

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

第 3 步:定義功能工程設計的處理步驟

本節展示如何建立處理步驟,以透過資料集準備用於進行訓練的資料。

建立處理步驟
  1. 建立處理命令碼的目錄。

    !mkdir -p abalone
  2. /abalone 目錄中,建立名為 preprocessing.py 的檔案,內含下列內容。此預處理命令碼會傳入至處理步驟,並基於輸入資料開始執行。然後,訓練步驟會使用預先處理的訓練功能和標籤來訓練模型,評估步驟會使用訓練過的模型和預先處理的測試功能和標籤來評估模型。此指令碼使用 scikit-learn 執行下列動作:

    • 填寫缺少的 sex 分類資料並進行編碼,以適合訓練之用。

    • 對除ringssex 之外的所有數字欄位執行縮放和標準化處理。

    • 將資料分割為訓練、測試和驗證資料集。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. SKLearnProcessor 建立要傳入處理步驟的執行個體。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", sagemaker_session=pipeline_session, role=role, )
  4. 建立處理步驟。此步驟會接受 SKLearnProcessor、輸入和輸出通道,以及您建立的 preprocessing.py 指令碼。這與 SageMaker Python SDK 中的處理器實例的run方法非常相似。傳入 ProcessingStepinput_data 參數是步驟本身的輸入資料。此輸入資料會在處理器執行個體執行時使用。

    請注意在處理任務的輸出組態中指定的 "train"validation"test" 具名通道。這類 Properties 步驟可以在後續步驟中使用,並在執行時解析為其執行期值。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep processor_args = sklearn_processor.run( inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", ) step_process = ProcessingStep( name="AbaloneProcess", step_args=processor_args )

第 4 步:定義訓練步驟

本節說明如何使用 SageMaker XGBoost 演算法來訓練處理步驟輸出的訓練資料的模型。

定義訓練步驟
  1. 指定儲存訓練模型的模型路徑。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. 設定 XGBoost 演算法和輸入資料集的估算器。訓練執行個體類型會傳遞至此估算器。典型的訓練指令碼會從輸入通道載入資料、使用超參數設定訓練、訓練模型,以及儲存模型,以model_dir便稍後可以託管。 SageMaker 在訓練任務結束model.tar.gz時,以一種形式將模型上傳到 Amazon S3。

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type="ml.m5.xlarge" ) xgb_train = Estimator( image_uri=image_uri, instance_type="ml.m5.xlarge", instance_count=1, output_path=model_path, sagemaker_session=pipeline_session, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. 使用 TrainingStep 的估算器執行個體和屬性建立 ProcessingStep。特別是,將 "train""validation" 輸出頻道的 S3Uri 傳遞給 TrainingStep。 

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep train_args = xgb_train.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, ) step_train = TrainingStep( name="AbaloneTrain", step_args = train_args )

第 5 步:定義模型評估的處理步驟

本節將介紹如何建立處理步驟來評估模型的準確性。此模型評估的結果會用於條件步驟,以決定要採用的執行路徑。

定義模型評估的處理步驟
  1. /abalone 目錄中建立名為 evaluation.py 的檔案。此指令碼用於處理步驟,以執行模型評估。它會採用訓練過的模型和測試資料集作為輸入,然後生成一個包含分類評估指標的 JSON 檔案。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. 建立一個 ScriptProcessor 執行個體,用來建立 ProcessingStep

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type="ml.m5.xlarge", instance_count=1, base_job_name="script-abalone-eval", sagemaker_session=pipeline_session, role=role, )
  3. 建立ProcessingStep使用處理器執行個體、輸入和輸出通道以及指evaluation.py令碼。 特別是,從step_train訓練步驟傳入S3ModelArtifacts屬性,以及step_process處理步驟S3Uri"test"輸出通道。 這與 SageMaker Python SDK 中的處理器實例的run方法非常相似。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) eval_args = script_eval.run( inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", ) step_eval = ProcessingStep( name="AbaloneEval", step_args=eval_args, property_files=[evaluation_report], )

步驟 6:定 CreateModelStep 義 Batch 轉換

重要

我們建議使模型步驟用從 Python SDK 的 2.90.0 版開始 SageMaker 創建模型。 CreateModelStep將繼續在舊版 SageMaker Python SDK 中運作,但不再受到主動支援。

本節說明如何從訓練步驟的輸出建立 SageMaker 模型。此模型用於根據新資料集進行批次轉換。此步驟會傳入條件步驟,且只有在條件步驟評估為 true 時才會執行。

若要定義批 CreateModelStep 次轉換
  1. 建立 SageMaker 模型。從 step_train 訓練步驟傳入 S3ModelArtifacts 屬性。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=pipeline_session, role=role, )
  2. 定義模型的模 SageMaker 型輸入。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. CreateModelStep使用您定義的CreateModelInput和 SageMaker 模型實例創建。

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

步驟 7:定義執 TransformStep 行 Batch 轉換

本節展示如何在模型訓練後建立 TransformStep,以根據資料集執行批次轉換。此步驟會傳入條件步驟,且只有在條件步驟評估為 true 時才會執行。

若要定義執 TransformStep 行批次轉換
  1. 使用適當的運算執行個體類型、執行個體計數和所需的輸出 Amazon S3 儲存貯體 URI 建立轉換器執行個體。從 step_create_model CreateModel 步驟傳入 ModelName 屬性。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. 使用您定義的轉換器執行個體和 batch_data 管道參數建立 TransformStep

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

步驟 8:定義建立模型 Package 的 RegisterModel 步驟

重要

我們建議模型步驟使用從 Python SDK 的 2.90.0 版註冊模型。 SageMaker RegisterModel將繼續在舊版 SageMaker Python SDK 中運作,但不再受到主動支援。

本節展示如何建構 RegisterModel 的執行個體。在管道中執行 RegisterModel 得到的結果是一個模型套件。模型套件是可重複使用的模型成品抽象,可封裝推論所需的所有元件。它由推論規格以及可選模型加權位置組成,推論規格會定義要使用的推論映像。模型套件群組是模型套件的集合。您可以使用 ModelPackageGroup for P SageMaker ipeline 將新版本和模型套件新增至群組,以便每次執行管線。若要取得有關模型註冊表的更多相關資訊,請參閱使用模型註冊表註冊和部署模型

此步驟會傳入條件步驟,且只有在條件步驟評估為 true 時才會執行。

定義建立模型套件的 RegisterModel 步驟
  • 透過用於訓練步驟的估算器執行個體來建構 RegisterModel 步驟。從 step_train 訓練步驟傳入 S3ModelArtifacts 屬性並指定 ModelPackageGroup。 SageMaker 管道會為您建ModelPackageGroup立此項目。

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

第 9 步:定義條件步驟以驗證模型準確性

A ConditionStep 允許 SageMaker 管道根據步驟屬性的條件在管線 DAG 中支援條件式執行。在這種情況下,只有模型準確性 (在模型評估步驟中確定) 超過所需值時,您才會註冊模型套件。如果精確度超過所需值,管線也會建立 SageMaker Model 並在資料集上執行批次轉換。本節展示如何定義條件步驟。

定義條件步驟以驗證模型準確性
  1. 使用模型評估計算處理步驟 step_eval 之輸出中的準確性值來定義 ConditionLessThanOrEqualTo 條件。使用處理步驟中編製索引的屬性檔案以及均方錯誤值 "mse" 的相應 JSONPath 來取得此輸出。

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. 建構 ConditionStep。傳入 ConditionEquals 條件,然後將模型套件註冊和批次轉換步驟設定為條件通過時執行的後續步驟。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

第 10 步:建立管道

您現在已建立了所有步驟,接下來將它們合併到一個管道中。

建立管道
  1. 為管道定義下列內容:nameparameters、和 steps(account, region) 對內的名稱必須是唯一的。

    注意

    一個步驟只能在管道的步驟清單或條件步驟的 if/else 步驟清單中出現一次。不能同時在這兩個清單中出現。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (可選) 檢查 JSON 管道定義,以確保其格式正確。

    import json json.loads(pipeline.definition())

此管線定義已準備好提交給 SageMaker。在下一個教學課程中,您將提交此管線至 SageMaker 並開始執行。

後續步驟:執行管道