Définition d'un pipeline - Amazon SageMaker

Définition d'un pipeline

Pour orchestrer vos flux avec Amazon SageMaker Model Building Pipelines, vous devez générer un graphe orienté acyclique (DAG) sous la forme d'une définition de pipeline JSON. L'image suivante représente le DAG de pipeline que vous créez dans ce didacticiel :

Vous pouvez générer votre définition de pipeline JSON à l'aide du kit SDK Python SageMaker. Le didacticiel suivant montre comment générer une définition de pipeline pour un pipeline qui résout un problème de régression afin de déterminer l'âge d'un ormeau en fonction de ses mesures physiques. Pour un bloc-notes Jupyter qui inclut le contenu de ce didacticiel que vous pouvez exécuter, veuillez consulter Orchestrating Jobs with Amazon SageMaker Model Building Pipelines.

Prerequisites

Pour exécuter le didacticiel suivant, vous devez effectuer les opérations ci-dessous :

  • Configurez votre instance de bloc-notes comme indiqué dans Création d'une instance de bloc-notes. Cela donne à votre rôle l'autorisation de lire et d'écrire sur Amazon S3, mais aussi de créer des tâches d'entraînement, de transformation par lots et de traitement dans SageMaker.

  • Accordez à votre bloc-notes des autorisations pour obtenir et transmettre son propre rôle comme indiqué dans Modification d'une politique d'autorisations de rôle. Ajoutez l'extrait JSON suivant pour attacher cette politique à votre rôle. Remplacez <your-role-arn> par l'ARN utilisé pour créer votre instance de bloc-notes.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • Faites confiance au principal de service SageMaker en suivant les étapes décrites dans Modification d'une politique d'autorisations de rôle. Ajoutez le fragment d'instruction suivante à la relation de confiance de votre rôle :

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

Configuration de votre environnement

Créez une session SageMaker à l'aide du bloc de code suivant. Cela renvoie l'ARN du rôle pour la session. L'ARN de ce rôle doit être l'ARN du rôle d'exécution que vous avez configuré comme condition préalable.

import boto3 import sagemaker import sagemaker.session region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() model_package_group_name = f"AbaloneModelPackageGroupName"

Création d'un pipeline

Exécutez les étapes suivantes à partir de votre instance de bloc-notes SageMaker pour créer un pipeline, y compris les étapes de prétraitement, d'entraînement, d'évaluation, d'évaluation conditionnelle et d'enregistrement de modèle.

Étape 1 : télécharger le jeu de données

Ce bloc-notes utilise le jeu de données Ormeau de machine learning de l'UCI. Le jeu de données contient les fonctions suivantes :

  • length – Mesure de la coquille la plus longue de l'ormeau.

  • diameter – Le diamètre de l'ormeau perpendiculaire à sa longueur.

  • height – La hauteur de l'ormeau avec de la viande dans la coquille.

  • whole_weight – Le poids de l'ormeau entier.

  • shucked_weight – Le poids de la viande retirée de l'ormeau.

  • viscera_weight – Poids des viscères d'ormeau après saignement.

  • shell_weight – Poids de la coquille de l'ormeau après avoir enlevé et séché la viande.

  • sex – Le sexe de l'ormeau. Une valeur « M », « F » ou « I », où « I » est un jeune ormeau.

  • rings – Le nombre d'anneaux dans la coquille de l'ormeau.

Le nombre d'anneaux dans la coquille de l'ormeau est une bonne approximation de son âge en utilisant la formule age=rings + 1.5. Cependant, l'obtention de ce nombre est une tâche fastidieuse. Vous devez couper la coquille à travers le cône, tacher la section et compter le nombre d'anneaux à l'aide d'un microscope. Cependant, les autres mesures physiques sont plus faciles à déterminer. Ce bloc-notes utilise le jeu de données pour créer un modèle prédictif des anneaux variables à l'aide des autres mesures physiques.

Pour télécharger le jeu de données

  1. Téléchargez le jeu de données dans le compartiment Amazon S3 par défaut de votre compte.

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. Téléchargez un deuxième jeu de données pour la transformation par lots après la création de votre modèle.

    local_path = "data/abalone-dataset-batch" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

Étape 2 : définir des paramètres de pipeline

Ce bloc de code définit les paramètres suivants pour votre pipeline :

  • processing_instance_type – Le type d'instance ml.* des tâches de traitement.

  • processing_instance_count – Le nombre d'instances de la tâche de traitement.

  • training_instance_type – Le type d'instance ml.* des tâches d'entraînement.

  • input_data – L'emplacement Amazon S3 des données d'entrée.

  • batch_data – L'emplacement Amazon S3 des données d'entrée pour la transformation par lots.

  • model_approval_status – Le statut d'approbation pour enregistrer le modèle entraîné avec pour CI/CD. Pour de plus amples informations, veuillez consulter . Automatiser les MLOps avec des projets SageMaker.

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) processing_instance_type = ParameterString( name="ProcessingInstanceType", default_value="ml.m5.xlarge" ) training_instance_type = ParameterString( name="TrainingInstanceType", default_value="ml.m5.xlarge" ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

Étape 3 : définir une étape de traitement pour l'ingénierie des fonctionnalités

Cette section vous indique comment créer une étape de traitement afin de préparer les données du jeu de données en vue de l'entraînement.

Pour créer une étape de traitement

  1. Créez un répertoire pour le script de traitement.

    !mkdir -p abalone
  2. Dans le répertoire /abalone, créez un fichier nommé preprocessing.py avec le contenu suivant. Ce script de prétraitement est transmis à l'étape de traitement pour l'exécution sur les données d'entrée. L'étape d'entraînement utilise ensuite les fonctions et les étiquettes d'entraînement prétraitées pour entraîner un modèle, tandis que l'étape d'évaluation utilise le modèle entraîné ainsi que les fonctions et les étiquettes de test prétraitées pour évaluer le modèle. Le script utilise scikit-learn pour effectuer les opérations suivantes :

    • Compléter les données de catégorie sex manquantes et les encoder pour qu'elles soient adaptées à l'entraînement.

    • Mettre à l'échelle et normaliser tous les champs numériques à l'exception de rings et sex.

    • Diviser les données en jeux de données d'entraînement, de validation et de test.

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. Créer une instance d'un SKLearnProcessor pour la transmettre à l'étape de traitement.

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type=processing_instance_type, instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", role=role, )
  4. Créer une étape de traitement. Cette étape adopte le SKLearnProcessor, les canaux d'entrée et de sortie, ainsi que le script preprocessing.py que vous avez créé. Ceci est très similaire à la méthode run de l'instance de processeur dans le kit SDK Python SageMaker. Le paramètre input_data transmis dans ProcessingStep correspond aux données d'entrée de l'étape elle-même. Ces données d'entrée sont utilisées par l'instance du processeur lors de son exécution.

    Notez les canaux nommés "train, "validation et "test" spécifiés dans la configuration de sortie pour la tâche de traitement. Les Properties de l'étape telles que celles-ci peuvent être utilisées dans les étapes suivantes et correspondre à leurs valeurs d'exécution lors de l'exécution.

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", )

Étape 4 : définir une étape d'entraînement

Cette section explique comment utiliser l'algorithme XGBoost de SageMaker pour entraîner un modèle de régression logistique sur les données d'entraînement issues des étapes de traitement.

Pour définir une étape d'entraînement

  1. Spécifiez le chemin d'accès au modèle dans lequel vous souhaitez enregistrer les modèles de l'entraînement.

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. Configurez un estimateur pour l'algorithme XGBoost et le jeu de données en entrée. Le training_instance_type est transmis à l'estimateur. Un script d'entraînement classique charge les données des canaux d'entrée, configure l'entraînement avec des hyperparamètres, entraîne un modèle et enregistre un modèle dans model_dir afin qu'il puisse être hébergé plus tard. SageMaker télécharge le modèle sur Amazon S3 sous la forme d'un model.tar.gz à la fin de la tâche d'entraînement.

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type=training_instance_type, ) xgb_train = Estimator( image_uri=image_uri, instance_type=training_instance_type, instance_count=1, output_path=model_path, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. Créez une TrainingStep à l'aide de l'instance d'estimateur et des propriétés de la ProcessingStep. En particulier, transmettez le S3Uri du canal de sortie "train" et "validation" sur la TrainingStep

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="AbaloneTrain", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, )

Étape 5 : définir une étape de traitement pour l'évaluation du modèle

Cette section vous explique comment créer une étape de traitement pour évaluer la précision du modèle. Le résultat de cette évaluation de modèle est utilisé dans l'étape de condition pour déterminer le chemin d'exécution à prendre.

Pour définir une étape de traitement pour l'évaluation du modèle

  1. Créez un fichier dans le répertoire /abalone nommé evaluation.py. Ce script est utilisé dans une étape de traitement pour effectuer l'évaluation du modèle. Il prend un modèle entraîné et le jeu de données de test comme entrée, puis produit un fichier JSON contenant des métriques d'évaluation de classification. Ces métriques comprennent une précision, un rappel et un score F1 pour chaque étiquette, ainsi que la précision et la courbe ROC pour le modèle.

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. Créez une instance de ScriptProcessor qui est utilisée pour créer une ProcessingStep.

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type=processing_instance_type, instance_count=1, base_job_name="script-abalone-eval", role=role, )
  3. Création d'une ProcessingStep à l'aide de l'instance du processeur, des canaux d'entrée et de sortie et du script evaluation.py. En particulier, transmettez la propriété S3ModelArtifacts depuis l'étape d'entraînement step_train, ainsi que le S3Uri du canal de sortie "test" de l'étape de traitement step_process. Ceci est très similaire à la méthode run de l'instance de processeur dans le kit SDK Python SageMaker. 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) step_eval = ProcessingStep( name="AbaloneEval", processor=script_eval, inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", property_files=[evaluation_report], )

Étape 6 : définir une CreateModelStep pour la transformation par lots

Cette section explique comment créer un modèle SageMaker à partir de la sortie de l'étape d'entraînement. Ce modèle est utilisé pour la transformation par lots sur un nouveau jeu de données. Cette étape est transmise à l'étape de condition et ne s'exécute que si l'étape de condition a la valeur true.

Pour définir une CreateModelStep pour la transformation par lots

  1. Créez un modèle SageMaker. Transmettez la propriété S3ModelArtifacts depuis l'étape d'entraînement step_train.

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sagemaker_session, role=role, )
  2. Définissez l'entrée du modèle pour votre modèle SageMaker.

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. Créez votre CreateModelStep à l'aide de CreateModelInput et de l'instance de modèle SageMaker que vous avez définie.

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

Étape 7 : définir une TransformStep pour effectuer une transformation par lots

Cette section explique comment créer une TransformStep pour effectuer une transformation par lots sur un jeu de données après l'entraînement du modèle. Cette étape est transmise à l'étape de condition et ne s'exécute que si l'étape de condition a la valeur true.

Pour définir une TransformStep pour effectuer une transformation par lots

  1. Créez une instance de transformateur avec le type d'instance de calcul approprié, le nombre d'instances et l'URI de compartiment Amazon S3 de sortie souhaitée. Transmettez la propriété ModelName depuis l'étape step_create_model CreateModel.

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. Créez une TransformStep à l'aide de l'instance de transformateur que vous avez définie et du paramètre de pipeline batch_data.

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

Étape 8 : définir une étape RegisterModel pour créer un package de modèle

Cette section montre comment construire une instance de  RegisterModel. Le résultat de l'exécution de RegisterModel dans un pipeline est un package de modèle. Un package de modèle est une abstraction d'artefacts de modèle réutilisable qui contient tous les ingrédients nécessaires à l'inférence. Il se compose d'une spécification d'inférence qui définit l'image d'inférence à utiliser avec un emplacement de pondération de modèle facultatif. Un groupe de packages de modèles est une collection de packages de modèles. Vous pouvez utiliser un ModelPackageGroup pour SageMaker Pipelines afin d'ajouter une nouvelle version et un package de modèles au groupe pour chaque exécution de pipeline. Pour de plus amples informations sur le registre de modèles, veuillez consulter Enregistrer et déployer des modèles avec Model Registry.

Cette étape est transmise à l'étape de condition et ne s'exécute que si l'étape de condition a la valeur true.

Pour définir une étape RegisterModel pour créer un package de modèles

  • Créez une RegisterModel à l'aide de l'instance d'estimateur que vous avez utilisée pour l'étape d'entraînement. Transmettez la propriété S3ModelArtifacts depuis l'étape d'entraînement step_train et spécifiez un ModelPackageGroup. SageMaker Pipelines crée ce ModelPackageGroup pour vous.

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

Étape 9 : définir une étape de condition pour vérifier la précision du modèle

Une ConditionStep permet à SageMaker Pipelines de prendre en charge l'exécution conditionnelle dans votre DAG de pipeline en fonction de la condition des propriétés d'étape. Dans ce cas, vous ne souhaitez enregistrer un package de modèles que si la précision de ce modèle, telle que déterminée par l'étape d'évaluation du modèle, dépasse la valeur requise. Si la précision dépasse la valeur requise, le pipeline crée également un modèle SageMaker et exécute la transformation par lots sur un jeu de données. Cette section explique comment définir l'étape Condition.

Pour définir une étape de condition pour vérifier la précision du modèle

  1. Définissez une condition ConditionLessThanOrEqualToen utilisant la valeur de précision trouvée dans la sortie de l'étape de traitement de l'évaluation du modèle, step_eval. Obtenez cette sortie en utilisant le fichier de propriétés que vous avez indexé à l'étape de traitement et le JSONPath respectif de la valeur d'erreur quadratique moyenne, "mse".

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ( ConditionStep, JsonGet, ) cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step=step_eval, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. Créez une ConditionStep. Transmettez la condition ConditionEquals, puis définissez les étapes d'enregistrement de package de modèle et de transformation par lots comme les étapes suivantes si la condition est satisfaite.

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

Étape 10 : créer un pipeline

Maintenant que vous avez créé toutes les étapes, combinez-les dans un pipeline.

Pour créer un pipeline

  1. Définissez les éléments suivants pour votre pipeline :name, parameters, et steps. Les noms doivent être uniques au sein d'une paire (account, region).

    Note

    Une étape ne peut apparaître qu'une seule fois dans la liste des étapes du pipeline ou dans les listes d'étapes if/else de l'étape de condition. Elle ne peut pas apparaître dans les deux.

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_type, processing_instance_count, training_instance_type, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (Facultatif) Examinez la définition de pipeline JSON pour vous assurer qu'elle est bien formée.

    import json json.loads(pipeline.definition())

Cette définition de pipeline est prête à être envoyée à SageMaker. Dans le didacticiel suivant, vous envoyez ce pipeline à SageMaker et lancez une exécution.

Étape suivante: Exécuter un pipeline