Définition d'un pipeline - Amazon SageMaker

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Définition d'un pipeline

Pour orchestrer vos flux de travail avec Amazon SageMaker Model Building Pipelines, vous devez générer un graphe acyclique dirigé (DAG) sous la forme d'une définition de pipeline JSON. L'image suivante est une représentation du DAG de pipeline que vous créez dans ce didacticiel :

Vous pouvez générer votre définition de pipeline JSON à l'aide du SDK SageMaker Python. Le didacticiel suivant montre comment générer une définition de pipeline pour un pipeline qui résout un problème de régression afin de déterminer l'âge d'un ormeau sur la base de ses mesures physiques. Pour un bloc-notes Jupyter incluant le contenu de ce didacticiel que vous pouvez exécuter, consultez Orchestrating Jobs with Amazon SageMaker Model Building Pipelines.

Prérequis

Pour exécuter le tutoriel suivant, vous devez effectuer les opérations ci-dessous :

  • Configurez votre instance de bloc-notes comme indiqué dans Création d'une instance de bloc-notes. Cela donne à votre rôle les autorisations nécessaires pour lire et écrire sur Amazon S3, et pour créer des tâches de formation, de transformation par lots et de traitement dans SageMaker.

  • Accordez à votre bloc-notes des autorisations pour obtenir et transmettre son propre rôle comme indiqué dans Modification d'une politique d'autorisations de rôle. Ajoutez l'extrait JSON suivant pour attacher cette politique à votre rôle. Remplacez <your-role-arn> par l'ARN utilisé pour créer votre instance de bloc-notes.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "iam:GetRole", "iam:PassRole" ], "Resource": "<your-role-arn>" } ] }
  • Faites confiance au principal du SageMaker service en suivant les étapes décrites dans la section Modification d'une politique de confiance des rôles. Ajoutez le fragment d'instruction suivante à la relation de confiance de votre rôle :

    { "Sid": "", "Effect": "Allow", "Principal": { "Service": "sagemaker.amazonaws.com" }, "Action": "sts:AssumeRole" }

Configuration de votre environnement

Créez une nouvelle SageMaker session à l'aide du bloc de code suivant. Cela renvoie l'ARN du rôle pour la session. L'ARN de ce rôle doit être l'ARN du rôle d'exécution que vous avez configuré comme prérequis.

import boto3 import sagemaker import sagemaker.session region = boto3.Session().region_name sagemaker_session = sagemaker.session.Session() role = sagemaker.get_execution_role() default_bucket = sagemaker_session.default_bucket() model_package_group_name = f"AbaloneModelPackageGroupName"

Création d'un pipeline

Exécutez les étapes suivantes à partir de votre instance de SageMaker bloc-notes pour créer un pipeline comprenant des étapes de prétraitement, de formation, d'évaluation, d'évaluation conditionnelle et d'enregistrement du modèle.

Étape 1 : télécharger le jeu de données

Ce bloc-notes utilise le jeu de données Ormeau de machine learning de l'UCI. Le jeu de données contient les fonctions suivantes :

  • length – Mesure de la coquille la plus longue de l'ormeau.

  • diameter – Le diamètre de l'ormeau perpendiculaire à sa longueur.

  • height – La hauteur de l'ormeau avec de la viande dans la coquille.

  • whole_weight – Le poids de l'ormeau entier.

  • shucked_weight – Le poids de la viande retirée de l'ormeau.

  • viscera_weight – Poids des viscères d'ormeau après saignement.

  • shell_weight – Poids de la coquille de l'ormeau après avoir enlevé et séché la viande.

  • sex – Le sexe de l'ormeau. Une valeur « M », « F » ou « I », où « I » est un jeune ormeau.

  • rings – Le nombre d'anneaux dans la coquille de l'ormeau.

Le nombre d'anneaux dans la coquille de l'ormeau est une bonne approximation de son âge en utilisant la formule age=rings + 1.5. Cependant, l'obtention de ce nombre est une tâche fastidieuse. Vous devez couper la coquille à travers le cône, tacher la section et compter le nombre d'anneaux à l'aide d'un microscope. Cependant, les autres mesures physiques sont plus faciles à déterminer. Ce bloc-notes utilise le jeu de données pour créer un modèle prédictif des anneaux variables à l'aide des autres mesures physiques.

Pour télécharger le jeu de données
  1. Téléchargez le jeu de données dans le compartiment Amazon S3 par défaut de votre compte.

    !mkdir -p data local_path = "data/abalone-dataset.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset.csv", local_path ) base_uri = f"s3://{default_bucket}/abalone" input_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(input_data_uri)
  2. Téléchargez un deuxième jeu de données pour la transformation par lots après la création de votre modèle.

    local_path = "data/abalone-dataset-batch.csv" s3 = boto3.resource("s3") s3.Bucket(f"sagemaker-servicecatalog-seedcode-{region}").download_file( "dataset/abalone-dataset-batch", local_path ) base_uri = f"s3://{default_bucket}/abalone" batch_data_uri = sagemaker.s3.S3Uploader.upload( local_path=local_path, desired_s3_uri=base_uri, ) print(batch_data_uri)

Étape 2 : définir des paramètres de pipeline

Ce bloc de code définit les paramètres suivants pour votre pipeline :

  • processing_instance_count – Le nombre d'instances de la tâche de traitement.

  • input_data – L'emplacement Amazon S3 des données d'entrée.

  • batch_data – L'emplacement Amazon S3 des données d'entrée pour la transformation par lots.

  • model_approval_status – Le statut d'approbation pour enregistrer le modèle entraîné avec pour CI/CD. Pour plus d’informations, consultez Automatisez les MLOP avec des projets SageMaker .

from sagemaker.workflow.parameters import ( ParameterInteger, ParameterString, ) processing_instance_count = ParameterInteger( name="ProcessingInstanceCount", default_value=1 ) model_approval_status = ParameterString( name="ModelApprovalStatus", default_value="PendingManualApproval" ) input_data = ParameterString( name="InputData", default_value=input_data_uri, ) batch_data = ParameterString( name="BatchData", default_value=batch_data_uri, )

Étape 3 : définir une étape de traitement pour l'ingénierie des fonctionnalités

Cette section vous indique comment créer une étape de traitement afin de préparer les données du jeu de données en vue de l'entraînement.

Pour créer une étape de traitement
  1. Créez un répertoire pour le script de traitement.

    !mkdir -p abalone
  2. Dans le répertoire /abalone, créez un fichier nommé preprocessing.py avec le contenu suivant. Ce script de prétraitement est transmis à l'étape de traitement pour l'exécution sur les données d'entrée. L'étape d'entraînement utilise ensuite les fonctions et les étiquettes d'entraînement prétraitées pour entraîner un modèle, tandis que l'étape d'évaluation utilise le modèle entraîné ainsi que les fonctions et les étiquettes de test prétraitées pour évaluer le modèle. Le script utilise scikit-learn pour effectuer les opérations suivantes :

    • Compléter les données de catégorie sex manquantes et les encoder pour qu'elles soient adaptées à l'entraînement.

    • Mettre à l'échelle et normaliser tous les champs numériques à l'exception de rings et sex.

    • Diviser les données en jeux de données d'entraînement, de validation et de test.

    %%writefile abalone/preprocessing.py import argparse import os import requests import tempfile import numpy as np import pandas as pd from sklearn.compose import ColumnTransformer from sklearn.impute import SimpleImputer from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler, OneHotEncoder # Because this is a headerless CSV file, specify the column names here. feature_columns_names = [ "sex", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight", ] label_column = "rings" feature_columns_dtype = { "sex": str, "length": np.float64, "diameter": np.float64, "height": np.float64, "whole_weight": np.float64, "shucked_weight": np.float64, "viscera_weight": np.float64, "shell_weight": np.float64 } label_column_dtype = {"rings": np.float64} def merge_two_dicts(x, y): z = x.copy() z.update(y) return z if __name__ == "__main__": base_dir = "/opt/ml/processing" df = pd.read_csv( f"{base_dir}/input/abalone-dataset.csv", header=None, names=feature_columns_names + [label_column], dtype=merge_two_dicts(feature_columns_dtype, label_column_dtype) ) numeric_features = list(feature_columns_names) numeric_features.remove("sex") numeric_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="median")), ("scaler", StandardScaler()) ] ) categorical_features = ["sex"] categorical_transformer = Pipeline( steps=[ ("imputer", SimpleImputer(strategy="constant", fill_value="missing")), ("onehot", OneHotEncoder(handle_unknown="ignore")) ] ) preprocess = ColumnTransformer( transformers=[ ("num", numeric_transformer, numeric_features), ("cat", categorical_transformer, categorical_features) ] ) y = df.pop("rings") X_pre = preprocess.fit_transform(df) y_pre = y.to_numpy().reshape(len(y), 1) X = np.concatenate((y_pre, X_pre), axis=1) np.random.shuffle(X) train, validation, test = np.split(X, [int(.7*len(X)), int(.85*len(X))]) pd.DataFrame(train).to_csv(f"{base_dir}/train/train.csv", header=False, index=False) pd.DataFrame(validation).to_csv(f"{base_dir}/validation/validation.csv", header=False, index=False) pd.DataFrame(test).to_csv(f"{base_dir}/test/test.csv", header=False, index=False)
  3. Créer une instance d'un SKLearnProcessor pour la transmettre à l'étape de traitement.

    from sagemaker.sklearn.processing import SKLearnProcessor framework_version = "0.23-1" sklearn_processor = SKLearnProcessor( framework_version=framework_version, instance_type="ml.m5.xlarge", instance_count=processing_instance_count, base_job_name="sklearn-abalone-process", role=role, )
  4. Créer une étape de traitement. Cette étape adopte le SKLearnProcessor, les canaux d'entrée et de sortie, ainsi que le script preprocessing.py que vous avez créé. Cette run méthode est très similaire à celle d'une instance de processeur dans le SDK SageMaker Python. Le paramètre input_data transmis dans ProcessingStep correspond aux données d'entrée de l'étape elle-même. Ces données d'entrée sont utilisées par l'instance du processeur lors de son exécution.

    Notez les canaux nommés "train, "validation et "test" spécifiés dans la configuration de sortie pour la tâche de traitement. Les Properties de l'étape telles que celles-ci peuvent être utilisées dans les étapes suivantes et correspondre à leurs valeurs d'exécution lors de l'exécution.

    from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep step_process = ProcessingStep( name="AbaloneProcess", processor=sklearn_processor, inputs=[ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="abalone/preprocessing.py", )

Étape 4 : définir une étape d'entraînement

Cette section explique comment utiliser l'algorithme SageMaker XGBoost pour entraîner un modèle sur les données d'entraînement issues des étapes de traitement.

Pour définir une étape d'entraînement
  1. Spécifiez le chemin d'accès au modèle dans lequel vous souhaitez enregistrer les modèles de l'entraînement.

    model_path = f"s3://{default_bucket}/AbaloneTrain"
  2. Configurez un estimateur pour l'algorithme XGBoost et le jeu de données en entrée. Le type d'instance d'entraînement est transmis à l'estimateur. Un script d'entraînement classique charge les données à partir des canaux d'entrée, configure l'entraînement avec des hyperparamètres, entraîne un modèle et enregistre un modèle model_dir afin qu'il puisse être hébergé ultérieurement. SageMaker télécharge le modèle sur Amazon S3 sous la forme d'un fichier model.tar.gz à la fin de la formation.

    from sagemaker.estimator import Estimator image_uri = sagemaker.image_uris.retrieve( framework="xgboost", region=region, version="1.0-1", py_version="py3", instance_type="ml.m5.xlarge" ) xgb_train = Estimator( image_uri=image_uri, instance_type="ml.m5.xlarge", instance_count=1, output_path=model_path, role=role, ) xgb_train.set_hyperparameters( objective="reg:linear", num_round=50, max_depth=5, eta=0.2, gamma=4, min_child_weight=6, subsample=0.7, silent=0 )
  3. Créez une TrainingStep à l'aide de l'instance d'estimateur et des propriétés de la ProcessingStep. En particulier, transmettez le S3Uri du canal de sortie "train" et "validation" sur la TrainingStep

    from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep step_train = TrainingStep( name="AbaloneTrain", estimator=xgb_train, inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) }, )

Étape 5 : définir une étape de traitement pour l'évaluation du modèle

Cette section vous explique comment créer une étape de traitement pour évaluer la précision du modèle. Le résultat de cette évaluation de modèle est utilisé dans l'étape de condition pour déterminer le chemin d'exécution à prendre.

Pour définir une étape de traitement pour l'évaluation du modèle
  1. Créez un fichier dans le répertoire /abalone nommé evaluation.py. Ce script est utilisé dans une étape de traitement pour effectuer l'évaluation du modèle. Il prend un modèle entraîné et le jeu de données de test comme entrée, puis produit un fichier JSON contenant des métriques d'évaluation de classification.

    %%writefile abalone/evaluation.py import json import pathlib import pickle import tarfile import joblib import numpy as np import pandas as pd import xgboost from sklearn.metrics import mean_squared_error if __name__ == "__main__": model_path = f"/opt/ml/processing/model/model.tar.gz" with tarfile.open(model_path) as tar: tar.extractall(path=".") model = pickle.load(open("xgboost-model", "rb")) test_path = "/opt/ml/processing/test/test.csv" df = pd.read_csv(test_path, header=None) y_test = df.iloc[:, 0].to_numpy() df.drop(df.columns[0], axis=1, inplace=True) X_test = xgboost.DMatrix(df.values) predictions = model.predict(X_test) mse = mean_squared_error(y_test, predictions) std = np.std(y_test - predictions) report_dict = { "regression_metrics": { "mse": { "value": mse, "standard_deviation": std }, }, } output_dir = "/opt/ml/processing/evaluation" pathlib.Path(output_dir).mkdir(parents=True, exist_ok=True) evaluation_path = f"{output_dir}/evaluation.json" with open(evaluation_path, "w") as f: f.write(json.dumps(report_dict))
  2. Créez une instance de ScriptProcessor qui est utilisée pour créer une ProcessingStep.

    from sagemaker.processing import ScriptProcessor script_eval = ScriptProcessor( image_uri=image_uri, command=["python3"], instance_type="ml.m5.xlarge", instance_count=1, base_job_name="script-abalone-eval", role=role, )
  3. Créez une instance ProcessingStep en utilisant le processeur, les canaux d'entrée et de sortie et le evaluation.py script. En particulier, transmettez la S3ModelArtifacts propriété de l'étape d'step_trainapprentissage, ainsi que celle S3Uri du canal de "test" sortie de l'étape de step_process traitement. Cette run méthode est très similaire à celle d'une instance de processeur dans le SDK SageMaker Python. 

    from sagemaker.workflow.properties import PropertyFile evaluation_report = PropertyFile( name="EvaluationReport", output_name="evaluation", path="evaluation.json" ) step_eval = ProcessingStep( name="AbaloneEval", processor=script_eval, inputs=[ ProcessingInput( source=step_train.properties.ModelArtifacts.S3ModelArtifacts, destination="/opt/ml/processing/model" ), ProcessingInput( source=step_process.properties.ProcessingOutputConfig.Outputs[ "test" ].S3Output.S3Uri, destination="/opt/ml/processing/test" ) ], outputs=[ ProcessingOutput(output_name="evaluation", source="/opt/ml/processing/evaluation"), ], code="abalone/evaluation.py", property_files=[evaluation_report], )

Étape 6 : Définition CreateModelStep d'une transformation par lots

Important

Nous vous recommandons Étape du modèle de l'utiliser pour créer des modèles à partir de la version 2.90.0 du SDK Python SageMaker . CreateModelStepcontinuera de fonctionner dans les versions précédentes du SDK SageMaker Python, mais n'est plus activement pris en charge.

Cette section explique comment créer un SageMaker modèle à partir du résultat de l'étape d'apprentissage. Ce modèle est utilisé pour la transformation par lots sur un nouveau jeu de données. Cette étape est transmise à l'étape de condition et ne s'exécute que si l'étape de condition a la valeur true.

Pour définir une CreateModelStep transformation par lots
  1. Créez un SageMaker modèle. Transmettez la propriété S3ModelArtifacts depuis l'étape d'entraînement step_train.

    from sagemaker.model import Model model = Model( image_uri=image_uri, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=sagemaker_session, role=role, )
  2. Définissez l'entrée du modèle pour votre SageMaker modèle.

    from sagemaker.inputs import CreateModelInput inputs = CreateModelInput( instance_type="ml.m5.large", accelerator_type="ml.eia1.medium", )
  3. Créez votre instance CreateModelStep à l'aide de l'instance de SageMaker modèle CreateModelInput et que vous avez définie.

    from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=model, inputs=inputs, )

Étape 7 : Définir une transformation par lots TransformStep pour effectuer une transformation par lots

Cette section explique comment créer une TransformStep pour effectuer une transformation par lots sur un jeu de données après l'entraînement du modèle. Cette étape est transmise à l'étape de condition et ne s'exécute que si l'étape de condition a la valeur true.

Pour définir un TransformStep pour effectuer une transformation par lots
  1. Créez une instance de transformateur avec le type d'instance de calcul approprié, le nombre d'instances et l'URI de compartiment Amazon S3 de sortie souhaitée. Transmettez la propriété ModelName depuis l'étape step_create_model CreateModel.

    from sagemaker.transformer import Transformer transformer = Transformer( model_name=step_create_model.properties.ModelName, instance_type="ml.m5.xlarge", instance_count=1, output_path=f"s3://{default_bucket}/AbaloneTransform" )
  2. Créez une TransformStep à l'aide de l'instance de transformateur que vous avez définie et du paramètre de pipeline batch_data.

    from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep step_transform = TransformStep( name="AbaloneTransform", transformer=transformer, inputs=TransformInput(data=batch_data) )

Étape 8 : Définition d'une RegisterModel étape pour créer un package modèle

Important

Nous vous recommandons Étape du modèle de l'utiliser pour enregistrer des modèles à partir de la version 2.90.0 du SDK Python SageMaker . RegisterModelcontinuera de fonctionner dans les versions précédentes du SDK SageMaker Python, mais n'est plus activement pris en charge.

Cette section montre comment créer une instance de  RegisterModel. Le résultat de l'exécution de RegisterModel dans un pipeline est un package de modèle. Un package de modèle est une abstraction d'artefacts de modèle réutilisable qui contient tous les ingrédients nécessaires à l'inférence. Il se compose d'une spécification d'inférence qui définit l'image d'inférence à utiliser avec un emplacement de pondération de modèle facultatif. Un groupe de packages de modèles est une collection de packages de modèles. Vous pouvez utiliser a ModelPackageGroup for SageMaker Pipelines pour ajouter une nouvelle version et un nouveau modèle de package au groupe pour chaque exécution de pipeline. Pour de plus amples informations sur le registre de modèles, veuillez consulter Enregistrer et déployer des modèles avec Model Registry.

Cette étape est transmise à l'étape de condition et ne s'exécute que si l'étape de condition a la valeur true.

Pour définir une RegisterModel étape de création d'un package modèle
  • Créez une RegisterModel à l'aide de l'instance d'estimateur que vous avez utilisée pour l'étape d'entraînement. Transmettez la propriété S3ModelArtifacts depuis l'étape d'entraînement step_train et spécifiez un ModelPackageGroup. SageMaker Pipelines crée cela ModelPackageGroup pour vous.

    from sagemaker.model_metrics import MetricsSource, ModelMetrics from sagemaker.workflow.step_collections import RegisterModel model_metrics = ModelMetrics( model_statistics=MetricsSource( s3_uri="{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] ), content_type="application/json" ) ) step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

Étape 9 : définir une étape de condition pour vérifier la précision du modèle

A ConditionStep permet aux SageMaker pipelines de prendre en charge l'exécution conditionnelle dans votre DAG de pipeline en fonction de l'état des propriétés des étapes. Dans ce cas, vous ne souhaitez enregistrer un package de modèles que si la précision de ce modèle, telle que déterminée par l'étape d'évaluation du modèle, dépasse la valeur requise. Si la précision dépasse la valeur requise, le pipeline crée également un SageMaker modèle et exécute une transformation par lots sur un ensemble de données. Cette section explique comment définir l'étape Condition.

Pour définir une étape de condition pour vérifier la précision du modèle
  1. Définissez une condition ConditionLessThanOrEqualToen utilisant la valeur de précision trouvée dans la sortie de l'étape de traitement de l'évaluation du modèle, step_eval. Obtenez cette sortie en utilisant le fichier de propriétés que vous avez indexé à l'étape de traitement et le JSONPath respectif de la valeur d'erreur quadratique moyenne, "mse".

    from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 )
  2. Créez une ConditionStep. Transmettez la condition ConditionEquals, puis définissez les étapes d'enregistrement de package de modèle et de transformation par lots comme les étapes suivantes si la condition est satisfaite.

    step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[], )

Étape 10 : créer un pipeline

Maintenant que vous avez créé toutes les étapes, combinez-les dans un pipeline.

Pour créer un pipeline
  1. Définissez les éléments suivants pour votre pipeline :name, parameters, et steps. Les noms doivent être uniques au sein d'une paire (account, region).

    Note

    Une étape ne peut apparaître qu'une seule fois dans la liste des étapes du pipeline ou dans les listes d'étapes if/else de l'étape de condition. Elle ne peut pas apparaître dans les deux.

    from sagemaker.workflow.pipeline import Pipeline pipeline_name = f"AbalonePipeline" pipeline = Pipeline( name=pipeline_name, parameters=[ processing_instance_count, model_approval_status, input_data, batch_data, ], steps=[step_process, step_train, step_eval, step_cond], )
  2. (Facultatif) Examinez la définition de pipeline JSON pour vous assurer qu'elle est bien formée.

    import json json.loads(pipeline.definition())

Cette définition de pipeline est prête à être soumise à SageMaker. Dans le didacticiel suivant, vous allez soumettre ce pipeline SageMaker et démarrer une exécution.

Étape suivante : Exécuter un pipeline