翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
パイプラインを定義する
Amazon SageMaker モデルビルディングパイプラインを使用してワークフローを調整するには、JSON パイプライン定義の形式で有向非循環グラフ (DAG) を生成する必要があります。 以下の画像は、このチュートリアルで作成するパイプライン DAG を表しています。
SageMaker Python SDK を使用して JSON パイプライン定義を生成できます。 次のチュートリアルでは、回帰問題を解決してアワビの物理的測定値に基づいて年齢を判断するパイプラインのパイプライン定義を生成する方法を示しています。実行できるこのチュートリアルのコンテンツを含む Jupyter ノートブックについては、「Amazon Model Building Pipeline によるジョブのオーケストレーション
前提条件
次のチュートリアルを実行するには、以下の操作を実行する必要があります。
-
「ノートブックインスタンスを作成する」の説明に従って、ノートブックインスタンスを設定します。これにより、ロールには Amazon S3 への読み取りと書き込み、およびでのトレーニング、バッチ変換、 SageMaker処理ジョブを作成する権限が付与されます。
-
「ロールのアクセス許可ポリシーの変更 (コンソール)」に示すように、独自のロールを取得および渡すアクセス許可をノートブックに付与します。次の JSON スニペットを追加して、このポリシーをロールにアタッチします。
<your-role-arn>
をノートブックインスタンスの作成に使用する ARN に置き換えます。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "
<your-role-arn>
" } ] } -
「ロールの信頼ポリシーの変更」の手順に従って、 SageMaker サービスプリンシパルを信頼してください。ロールの信頼関係に次のステートメントの断片を追加します。
{ "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }
環境のセットアップ
SageMaker 次のコードブロックを使用して新しいセッションを作成します。このコードブロックでは、セッションのロール ARN が返されます。このロール ARN は、前提条件として設定した実行ロール ARN になります。
import boto3 import sagemaker import sagemaker.session from sagemaker.workflow.pipeline_context import PipelineSession region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() pipeline_session = PipelineSession() model_package_group_name = f"AbaloneModelPackageGroupName"
パイプラインの作成
SageMaker ノートブックインスタンスから以下のステップを実行して、前処理、トレーニング、評価、条件付き評価、モデル登録のステップを含むパイプラインを作成します。
ステップ 1: データセットをダウンロードする
このノートブックでは、UCI Machine Learning Abalone Dataset を使用します。データセットには、次の特徴が含まれています。
-
length
- アワビの最長の殻の測定値。 -
diameter
- アワビの長さに垂直な直径。 -
height
- 殻に身が入った状態のアワビの高さ。 -
whole_weight
- アワビ全体の重量。 -
shucked_weight
- アワビから取り出した身の重量。 -
viscera_weight
- 血を抜いた後のアワビの内臓の重量。 -
shell_weight
- 身を取り除き乾燥させた後のアワビの殻の重量。 -
sex
- アワビの性別。「M」、「F」、「I」のいずれか。「I」は子供のアワビを表す。 -
rings
- アワビの殻の輪の数。
アワビの殻の輪の数によって、年齢の近似値が求められます (公式 age=rings + 1.5
を使用)。ただし、この数の取得には時間がかかります。コーンから殻を切断し、断面を染色して、顕微鏡で覗きながら輪の数を数えなければなりません。ただし、他の物理的な測定値は簡単に求められます。このノートブックではデータセットを使用し、他の物理的な測定値を用いた不定の輪の数の予測モデルを構築します。
データセットをダウンロードするには
-
アカウントのデフォルトの Amazon S3 バケットにデータセットをダウンロードします。
!mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
-
モデルを作成したら、バッチ変換用の 2 つ目のデータセットをダウンロードします。
local_path = "data/abalone-dataset-batch.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)
ステップ 2: パイプラインのパラメータを定義する
このコードブロックは、パイプラインの以下のパラメータを定義します。
-
processing_instance_count
- 処理ジョブのインスタンス数。 -
input_data
- 入力データの Amazon S3 の場所。 -
batch_data
- バッチ変換用の入力データの Amazon S3 の場所。 -
model_approval_status
- CI/CD のトレーニング済みモデルを登録するための承認ステータス。詳細については、「プロジェクトによる MLOP の自動化 SageMaker 」を参照してください。
from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )
ステップ 3: 特徴量エンジニアリングの処理ステップを定義する
このセクションでは、データセットからトレーニング用のデータを準備するための処理ステップの作成方法を説明します。
処理ステップを作成するには
処理スクリプト用のディレクトリを作成します。
!mkdir -p abalone
-
/abalone
ディレクトリに次の内容でpreprocessing.py
というファイルを作成します。この前処理スクリプトは、入力データに対する実行の処理ステップに渡されます。その後、トレーニングステップで、前処理されたトレーニング用の特徴とラベルを使用してモデルがトレーニングされ、評価ステップで、トレーニング済みモデルと前処理されたテスト用の特徴とラベルを使用してモデルが評価されます。スクリプトはscikit-learn
を使用して次の処理を実行します。不足している
sex
カテゴリデータと入力し、トレーニング用にエンコードします。-
rings
とsex
を除くすべての数値フィールドをスケーリングして正規化します。 -
データをトレーニング、テスト、検証のデータセットに分割します。
%%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
-
SKLearnProcessor
のインスタンスを作成して処理ステップに渡します。from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", sagemaker_session=pipeline_session, role=role, )
-
処理ステップを作成します。このステップは、
SKLearnProcessor
、入出力チャネル、作成したpreprocessing.py
スクリプトを受け取ります。これは SageMaker Python SDKrun
のプロセッサインスタンスのメソッドとよく似ています。ProcessingStep
に渡されるinput_data
パラメータはステップ自体の入力データです。この入力データは、プロセッサインスタンスの実行時に使用されます。"train
、"validation
、"test"
で指定されるチャネルは、処理ジョブの出力設定で指定されたものになります。このようなステップProperties
は、後続のステップで使用し、実行時にランタイム値に解決できます。from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep processor_args = sklearn_processor.run( inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", ) step_process = ProcessingStep( name="AbaloneProcess", step_args=processor_args )
ステップ 4: トレーニングステップを定義する
このセクションでは、 SageMaker XGBoost アルゴリズムを使用して、処理ステップから出力されたトレーニングデータに基づいてモデルをトレーニングする方法を説明します。
トレーニングステップを定義するには
-
トレーニングからモデルを保存するモデルパスを指定します。
model_path = f"s3://{default_bucket}/AbaloneTrain"
-
XGBoost アルゴリズムの推定器と入力データセットを設定します。トレーニングインスタンスタイプは推定器に渡されます。一般的なトレーニングスクリプトは、入力チャンネルからデータを読み込み、ハイパーパラメーターを使用してトレーニングを設定し、モデルをトレーニングして、
model_dir
後でホストできるようにモデルを保存します。 SageMaker トレーニングジョブの最後に、モデルを Amazon S3 にアップロードします。model.tar.gz
from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type="ml.m5.xlarge" ) xgb_train = Estimator( image_uri=image_uri, instance_type="ml.m5.xlarge", instance_count=1, output_path=model_path, sagemaker_session=pipeline_session, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
-
推定器インスタンスと
ProcessingStep
のプロパティを使用してTrainingStep
を作成します。具体的には、"train"
のS3Uri
とTrainingStep
への"validation"
出力チャネルを渡します。from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep train_args = xgb_train.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, ) step_train = TrainingStep( name="AbaloneTrain", step_args = train_args )
ステップ 5: モデル評価の処理ステップを定義する
このセクションでは、モデルの精度を評価するための処理ステップの作成方法を説明します。このモデル評価の結果は、条件ステップで使用する実行パスを決定するために使用されます。
モデル評価の処理ステップを定義するには
-
/abalone
ディレクトリにevaluation.py
という名前のファイルを作成します。このスクリプトは、モデル評価を実行するための処理ステップで使用されます。トレーニング済みのモデルとテストデータセットを入力として受け取り、分類評価メトリクスを含む JSON ファイルを生成します。%%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
-
ProcessingStep
の作成に使用されるScriptProcessor
のインスタンスを作成します。from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type="ml.m5.xlarge", instance_count=1, base_job_name="script-abalone-eval", sagemaker_session=pipeline_session, role=role, )
-
プロセッサインスタンス、入力チャネルと出力チャネル、
ProcessingStep
evaluation.py
スクリプトを使用してを作成します。 特に、S3ModelArtifacts
step_train
トレーニングステップのプロパティだけでなく、S3Uri
"test"
step_process
処理ステップの出力チャンネルのプロパティも渡してください。 これは SageMaker Python SDKrun
のプロセッサインスタンスのメソッドとよく似ています。from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) eval_args = script_eval.run( inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", ) step_eval = ProcessingStep( name="AbaloneEval", step_args=eval_args, property_files=[evaluation_report], )
ステップ 6: Batch CreateModelStep 変換の定義
重要
Python SDK v2.90.0 以降では、モデルステップを使用してモデルを作成することをお勧めします。 SageMaker CreateModelStep
以前のバージョンの SageMaker Python SDK でも引き続き動作しますが、現在は積極的にサポートされていません。
このセクションでは、 SageMaker トレーニングステップの出力からモデルを作成する方法を説明します。このモデルは、新しいデータセットのバッチ変換に使用されます。このステップは条件ステップに渡され、条件ステップが true
と評価された場合にのみ実行されます。
CreateModelStep バッチ変換用を定義するには
-
SageMaker モデルを作成します。
step_train
トレーニングステップのS3ModelArtifacts
プロパティを渡します。from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=pipeline_session, role=role, )
-
モデルのモデル入力を定義します SageMaker 。
from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
-
CreateModelStep
CreateModelInput
SageMaker 定義したモデルインスタンスを使用して作成します。from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )
ステップ 7: Batch TransformStep 変換を実行するための定義
このセクションでは、モデルのトレーニング後にデータセットにバッチ変換を実行する TransformStep
の作成方法を説明します。このステップは条件ステップに渡され、条件ステップが true
と評価された場合にのみ実行されます。
TransformStep バッチ変換を実行するように定義するには
-
該当するコンピューティングインスタンスタイプ、インスタンス数、目的の出力 Amazon S3 バケット URI を使用して、トランスフォーマーインスタンスを作成します。
step_create_model
CreateModel
ステップのModelName
プロパティを渡します。from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
-
定義したトランスフォーマーインスタンスと
batch_data
パイプラインパラメータを使用して、TransformStep
を作成します。from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )
ステップ 8: モデルPackage RegisterModel を作成するステップの定義
重要
Python SDK の v2.90.0 以降では、モデルステップを使用してモデルを登録することをお勧めします。 SageMaker RegisterModel
以前のバージョンの SageMaker Python SDK でも引き続き動作しますが、現在は積極的にサポートされていません。
このセクションでは、RegisterModel
のインスタンスを構築する方法を説明します。パイプラインで RegisterModel
を実行すると、モデルパッケージが作成されます。モデルパッケージは、再利用可能なモデルアーティファクトを抽象化したものであり、推論に必要なすべての成分がまとめられています。オプションのモデルの重みの場所と共に使用する推論イメージを定義する推論仕様で構成されます。モデルパッケージグループは、モデルパッケージがまとめられたものです。ModelPackageGroup
for SageMaker Pipelines を使用すると、パイプラインを実行するたびに新しいバージョンとモデルパッケージをグループに追加できます。モデルのレジストリの詳細については、「Model Registry を使用したモデルの登録とデプロイ」をご参照ください。
このステップは条件ステップに渡され、条件ステップが true
と評価された場合にのみ実行されます。
RegisterModel モデルパッケージを作成するステップを定義するには
-
トレーニングステップに使用した推定器インスタンスを使用して
RegisterModel
ステップを作成します。step_train
トレーニングステップのS3ModelArtifacts
プロパティを渡し、ModelPackageGroup
を作成します。 SageMaker PipelinesModelPackageGroup
がこれを自動的に作成します。from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="
{}/evaluation.json
".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel
", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge
"], transform_instances=["ml.m5.xlarge
"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )
ステップ 9: モデルの精度を検証するための条件ステップを定義する
A ConditionStep
を使用すると、 SageMaker パイプラインはステッププロパティの条件に基づいてパイプライン DAG での条件付き実行をサポートできます。ここでは、モデル評価ステップで決定されるモデルの精度が必要な値を超過した場合にのみ、モデルパッケージを登録する必要があります。精度が必須値を超えると、 SageMaker パイプラインはモデルも作成し、データセットに対してバッチ変換を実行します。このセクションでは、条件ステップを定義する方法を説明します。
モデルの精度を検証するための条件ステップを定義するには
-
モデル評価の処理ステップ
step_eval
の出力で見つかった精度値を使用して、ConditionLessThanOrEqualTo
条件を定義します。この出力を取得するには、処理ステップでインデックス付けしたプロパティファイルと、平均二乗誤差値"mse"
のそれぞれの JSONPath を使用します。from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
-
ConditionStep
を作成します。ConditionEquals
条件を渡し、条件が満たされた場合の次のステップとなる、モデルパッケージの登録ステップとバッチ変換ステップを設定します。step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )
ステップ 10: パイプラインを作成する
以上ですべてのステップが作成できました。次は、それらのステップをパイプラインに結合します。
パイプラインを作成するには
-
パイプラインの
name
、parameters
、steps
を定義します。名前は、(account, region)
ペア内で一意である必要があります。注記
ステップは、パイプラインのステップリストまたは条件ステップの if/else ステップリストに 1 回のみ表示できます。また、両方に表示することはできません。
from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
-
(オプション) JSON パイプラインの定義を調べて、フォーマットに誤りがないことを確認します。
import json json.loads(pipeline.definition())
このパイプライン定義は送信する準備ができています SageMaker。次のチュートリアルでは、 SageMaker このパイプラインをに送信して実行を開始します。
次のステップ: パイプラインを実行する