パイプラインを定義する - Amazon SageMaker

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

パイプラインを定義する

Amazon SageMaker モデルビルディングパイプラインを使用してワークフローを調整するには、JSON パイプライン定義の形式で有向非循環グラフ (DAG) を生成する必要があります。 以下の画像は、このチュートリアルで作成するパイプライン DAG を表しています。

SageMaker Python SDK を使用して JSON パイプライン定義を生成できます。 次のチュートリアルでは、回帰問題を解決してアワビの物理的測定値に基づいて年齢を判断するパイプラインのパイプライン定義を生成する方法を示しています。実行できるこのチュートリアルのコンテンツを含む Jupyter ノートブックについては、「Amazon Model Building Pipeline によるジョブのオーケストレーション」を参照してください。 SageMaker

前提条件

次のチュートリアルを実行するには、以下の操作を実行する必要があります。

環境のセットアップ

SageMaker 次のコードブロックを使用して新しいセッションを作成します。このコードブロックでは、セッションのロール ARN が返されます。このロール ARN は、前提条件として設定した実行ロール ARN になります。

import boto3 import sagemaker import sagemaker.session from sagemaker.workflow.pipeline_context import PipelineSession region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() pipeline_session = PipelineSession() model_package_group_name = f"AbaloneModelPackageGroupName"

パイプラインの作成

SageMaker ノートブックインスタンスから以下のステップを実行して、前処理、トレーニング、評価、条件付き評価、モデル登録のステップを含むパイプラインを作成します。

ステップ 1: データセットをダウンロードする

このノートブックでは、UCI Machine Learning Abalone Dataset を使用します。データセットには、次の特徴が含まれています。

  • length - アワビの最長の殻の測定値。

  • diameter - アワビの長さに垂直な直径。

  • height - 殻に身が入った状態のアワビの高さ。

  • whole_weight - アワビ全体の重量。

  • shucked_weight - アワビから取り出した身の重量。

  • viscera_weight - 血を抜いた後のアワビの内臓の重量。

  • shell_weight - 身を取り除き乾燥させた後のアワビの殻の重量。

  • sex - アワビの性別。「M」、「F」、「I」のいずれか。「I」は子供のアワビを表す。

  • rings - アワビの殻の輪の数。

アワビの殻の輪の数によって、年齢の近似値が求められます (公式 age=rings + 1.5 を使用)。ただし、この数の取得には時間がかかります。コーンから殻を切断し、断面を染色して、顕微鏡で覗きながら輪の数を数えなければなりません。ただし、他の物理的な測定値は簡単に求められます。このノートブックではデータセットを使用し、他の物理的な測定値を用いた不定の輪の数の予測モデルを構築します。

データセットをダウンロードするには
  1. アカウントのデフォルトの Amazon S3 バケットにデータセットをダウンロードします。

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. モデルを作成したら、バッチ変換用の 2 つ目のデータセットをダウンロードします。

    local_path = "data/abalone-dataset-batch.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

ステップ 2: パイプラインのパラメータを定義する

このコードブロックは、パイプラインの以下のパラメータを定義します。

  • processing_instance_count - 処理ジョブのインスタンス数。

  • input_data - 入力データの Amazon S3 の場所。

  • batch_data - バッチ変換用の入力データの Amazon S3 の場所。

  • model_approval_status - CI/CD のトレーニング済みモデルを登録するための承認ステータス。詳細については、「プロジェクトによる MLOP の自動化 SageMaker 」を参照してください。

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

ステップ 3: 特徴量エンジニアリングの処理ステップを定義する

このセクションでは、データセットからトレーニング用のデータを準備するための処理ステップの作成方法を説明します。

処理ステップを作成するには
  1. 処理スクリプト用のディレクトリを作成します。

    !mkdir -p abalone
  2. /abalone ディレクトリに次の内容で preprocessing.py というファイルを作成します。この前処理スクリプトは、入力データに対する実行の処理ステップに渡されます。その後、トレーニングステップで、前処理されたトレーニング用の特徴とラベルを使用してモデルがトレーニングされ、評価ステップで、トレーニング済みモデルと前処理されたテスト用の特徴とラベルを使用してモデルが評価されます。スクリプトは scikit-learn を使用して次の処理を実行します。

    • 不足している sex カテゴリデータと入力し、トレーニング用にエンコードします。

    • ringssex を除くすべての数値フィールドをスケーリングして正規化します。

    • データをトレーニング、テスト、検証のデータセットに分割します。

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. SKLearnProcessor のインスタンスを作成して処理ステップに渡します。

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", sagemaker_session=pipeline_session, role=role, )
  4. 処理ステップを作成します。このステップは、SKLearnProcessor、入出力チャネル、作成した preprocessing.py スクリプトを受け取ります。これは SageMaker Python SDK run のプロセッサインスタンスのメソッドとよく似ています。ProcessingStep に渡される input_data パラメータはステップ自体の入力データです。この入力データは、プロセッサインスタンスの実行時に使用されます。

    "train"validation"test" で指定されるチャネルは、処理ジョブの出力設定で指定されたものになります。このようなステップ Properties は、後続のステップで使用し、実行時にランタイム値に解決できます。

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep processor_args = sklearn_processor.run( inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", ) step_process = ProcessingStep( name="AbaloneProcess", step_args=processor_args )

ステップ 4: トレーニングステップを定義する

このセクションでは、 SageMaker XGBoost アルゴリズムを使用して、処理ステップから出力されたトレーニングデータに基づいてモデルをトレーニングする方法を説明します。

トレーニングステップを定義するには
  1. トレーニングからモデルを保存するモデルパスを指定します。

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. XGBoost アルゴリズムの推定器と入力データセットを設定します。トレーニングインスタンスタイプは推定器に渡されます。一般的なトレーニングスクリプトは、入力チャンネルからデータを読み込み、ハイパーパラメーターを使用してトレーニングを設定し、モデルをトレーニングして、model_dir後でホストできるようにモデルを保存します。 SageMaker トレーニングジョブの最後に、モデルを Amazon S3 にアップロードします。model.tar.gz

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type="ml.m5.xlarge" ) xgb_train = Estimator( image_uri=image_uri, instance_type="ml.m5.xlarge", instance_count=1, output_path=model_path, sagemaker_session=pipeline_session, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. 推定器インスタンスと ProcessingStep のプロパティを使用して TrainingStep を作成します。具体的には、"train"S3UriTrainingStep への "validation" 出力チャネルを渡します。 

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep train_args = xgb_train.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, ) step_train = TrainingStep( name="AbaloneTrain", step_args = train_args )

ステップ 5: モデル評価の処理ステップを定義する

このセクションでは、モデルの精度を評価するための処理ステップの作成方法を説明します。このモデル評価の結果は、条件ステップで使用する実行パスを決定するために使用されます。

モデル評価の処理ステップを定義するには
  1. /abalone ディレクトリに evaluation.py という名前のファイルを作成します。このスクリプトは、モデル評価を実行するための処理ステップで使用されます。トレーニング済みのモデルとテストデータセットを入力として受け取り、分類評価メトリクスを含む JSON ファイルを生成します。

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. ProcessingStep の作成に使用される ScriptProcessor のインスタンスを作成します。

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type="ml.m5.xlarge", instance_count=1, base_job_name="script-abalone-eval", sagemaker_session=pipeline_session, role=role, )
  3. プロセッサインスタンス、入力チャネルと出力チャネル、ProcessingStepevaluation.pyスクリプトを使用してを作成します。 特に、S3ModelArtifactsstep_trainトレーニングステップのプロパティだけでなく、S3Uri"test"step_process処理ステップの出力チャンネルのプロパティも渡してください。 これは SageMaker Python SDK run のプロセッサインスタンスのメソッドとよく似ています。 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) eval_args = script_eval.run( inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", ) step_eval = ProcessingStep( name="AbaloneEval", step_args=eval_args, property_files=[evaluation_report], )

ステップ 6: Batch CreateModelStep 変換の定義

重要

Python SDK v2.90.0 以降では、モデルステップを使用してモデルを作成することをお勧めします。 SageMaker CreateModelStep以前のバージョンの SageMaker Python SDK でも引き続き動作しますが、現在は積極的にサポートされていません。

このセクションでは、 SageMaker トレーニングステップの出力からモデルを作成する方法を説明します。このモデルは、新しいデータセットのバッチ変換に使用されます。このステップは条件ステップに渡され、条件ステップが true と評価された場合にのみ実行されます。

CreateModelStep バッチ変換用を定義するには
  1. SageMaker モデルを作成します。step_train トレーニングステップの S3ModelArtifacts プロパティを渡します。

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=pipeline_session, role=role, )
  2. モデルのモデル入力を定義します SageMaker 。

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. CreateModelStepCreateModelInput SageMaker 定義したモデルインスタンスを使用して作成します。

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

ステップ 7: Batch TransformStep 変換を実行するための定義

このセクションでは、モデルのトレーニング後にデータセットにバッチ変換を実行する TransformStep の作成方法を説明します。このステップは条件ステップに渡され、条件ステップが true と評価された場合にのみ実行されます。

TransformStep バッチ変換を実行するように定義するには
  1. 該当するコンピューティングインスタンスタイプ、インスタンス数、目的の出力 Amazon S3 バケット URI を使用して、トランスフォーマーインスタンスを作成します。step_create_model CreateModel ステップの ModelName プロパティを渡します。

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. 定義したトランスフォーマーインスタンスとbatch_data パイプラインパラメータを使用して、TransformStep を作成します。

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

ステップ 8: モデルPackage RegisterModel を作成するステップの定義

重要

Python SDK の v2.90.0 以降では、モデルステップを使用してモデルを登録することをお勧めします。 SageMaker RegisterModel以前のバージョンの SageMaker Python SDK でも引き続き動作しますが、現在は積極的にサポートされていません。

このセクションでは、RegisterModel のインスタンスを構築する方法を説明します。パイプラインで RegisterModel を実行すると、モデルパッケージが作成されます。モデルパッケージは、再利用可能なモデルアーティファクトを抽象化したものであり、推論に必要なすべての成分がまとめられています。オプションのモデルの重みの場所と共に使用する推論イメージを定義する推論仕様で構成されます。モデルパッケージグループは、モデルパッケージがまとめられたものです。ModelPackageGroupfor SageMaker Pipelines を使用すると、パイプラインを実行するたびに新しいバージョンとモデルパッケージをグループに追加できます。モデルのレジストリの詳細については、「Model Registry を使用したモデルの登録とデプロイ」をご参照ください。

このステップは条件ステップに渡され、条件ステップが true と評価された場合にのみ実行されます。

RegisterModel モデルパッケージを作成するステップを定義するには
  • トレーニングステップに使用した推定器インスタンスを使用して RegisterModel ステップを作成します。step_train トレーニングステップの S3ModelArtifacts プロパティを渡し、ModelPackageGroup を作成します。 SageMaker Pipelines ModelPackageGroup がこれを自動的に作成します。

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

ステップ 9: モデルの精度を検証するための条件ステップを定義する

A ConditionStep を使用すると、 SageMaker パイプラインはステッププロパティの条件に基づいてパイプライン DAG での条件付き実行をサポートできます。ここでは、モデル評価ステップで決定されるモデルの精度が必要な値を超過した場合にのみ、モデルパッケージを登録する必要があります。精度が必須値を超えると、 SageMaker パイプラインはモデルも作成し、データセットに対してバッチ変換を実行します。このセクションでは、条件ステップを定義する方法を説明します。

モデルの精度を検証するための条件ステップを定義するには
  1. モデル評価の処理ステップ step_eval の出力で見つかった精度値を使用して、ConditionLessThanOrEqualTo 条件を定義します。この出力を取得するには、処理ステップでインデックス付けしたプロパティファイルと、平均二乗誤差値 "mse" のそれぞれの JSONPath を使用します。

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. ConditionStep を作成します。ConditionEquals 条件を渡し、条件が満たされた場合の次のステップとなる、モデルパッケージの登録ステップとバッチ変換ステップを設定します。

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

ステップ 10: パイプラインを作成する

以上ですべてのステップが作成できました。次は、それらのステップをパイプラインに結合します。

パイプラインを作成するには
  1. パイプラインの nameparameterssteps を定義します。名前は、(account, region) ペア内で一意である必要があります。

    注記

    ステップは、パイプラインのステップリストまたは条件ステップの if/else ステップリストに 1 回のみ表示できます。また、両方に表示することはできません。

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (オプション) JSON パイプラインの定義を調べて、フォーマットに誤りがないことを確認します。

    import json json.loads(pipeline.definition())

このパイプライン定義は送信する準備ができています SageMaker。次のチュートリアルでは、 SageMaker このパイプラインをに送信して実行を開始します。

次のステップ: パイプラインを実行する