Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Créez un pipeline avec des @step
fonctions décorées
Vous pouvez créer un pipeline en convertissant les fonctions Python en étapes de pipeline à l'aide du @step
décorateur, en créant des dépendances entre ces fonctions pour créer un graphe de pipeline (ou un graphe acyclique dirigé (DAG)) et en transmettant les nœuds foliaires de ce graphe sous forme de liste d'étapes au pipeline. Les sections suivantes expliquent cette procédure en détail à l'aide d'exemples.
Rubriques
Convertir une fonction en étape
Pour créer une étape à l'aide du @step
décorateur, annotez la fonction avec. @step
L'exemple suivant montre une fonction @step
décorée qui prétraite les données.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Lorsque vous invoquez une fonction @step
décorée, SageMaker renvoie une DelayedReturn
instance au lieu d'exécuter la fonction. Une DelayedReturn
instance est un proxy pour le retour réel de cette fonction. L'DelayedReturn
instance peut être transmise à une autre fonction en tant qu'argument ou directement à une instance de pipeline en tant qu'étape. Pour plus d'informations sur la DelayedReturn
classe, consultez sagemaker.workflow.function_step. DelayedReturn
Créez des dépendances entre les étapes
Lorsque vous créez une dépendance entre deux étapes, vous créez une connexion entre les étapes de votre graphe de pipeline. Les sections suivantes présentent plusieurs manières de créer une dépendance entre les étapes de votre pipeline.
Dépendances des données via des arguments d'entrée
Le fait de transmettre la DelayedReturn
sortie d'une fonction en entrée à une autre fonction crée automatiquement une dépendance aux données dans le pipelineDAG. Dans l'exemple suivant, le transfert de la DelayedReturn
sortie de la preprocess
fonction à la train
fonction crée une dépendance entre preprocess
ettrain
.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
L'exemple précédent définit une fonction d'entraînement décorée avec@step
. Lorsque cette fonction est invoquée, elle reçoit le DelayedReturn
résultat de l'étape du pipeline de prétraitement en entrée. L'appel de la fonction d'entraînement renvoie une autre DelayedReturn
instance. Cette instance contient les informations sur toutes les étapes précédentes définies dans cette fonction (c'est-à-dire l'preprocess
étape de cet exemple) qui forment le pipelineDAG.
Dans l'exemple précédent, la preprocess
fonction renvoie une valeur unique. Pour les types de retour plus complexes tels que les listes ou les tuples, reportez-vous àLimites.
Définissez des dépendances personnalisées
Dans l'exemple précédent, la train
fonction a reçu le DelayedReturn
résultat de preprocess
et a créé une dépendance. Si vous souhaitez définir la dépendance de manière explicite sans transmettre le résultat de l'étape précédente, utilisez la add_depends_on
fonction avec l'étape. Vous pouvez utiliser la get_step()
fonction pour récupérer l'étape sous-jacente depuis son DelayedReturn
instance, puis appeler add_depends_on
_on avec la dépendance en entrée. Pour consulter la définition de la get_step()
fonction, consultez sagemaker.workflow.step_outputs.get_steppreprocess
et train
en utilisant get_step()
etadd_depends_on()
.
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Transmettre des données depuis et vers une fonction @step
décorée vers une étape de pipeline traditionnelle
Vous pouvez créer un pipeline qui inclut une étape @step
décorée et une étape de pipeline traditionnelle et qui transmet des données entre elles. Par exemple, vous pouvez l'utiliser ProcessingStep
pour traiter les données et transmettre le résultat à la fonction d'entraînement @step
-decorated. Dans l'exemple suivant, une étape d'apprentissage @step
décorée fait référence au résultat d'une étape de traitement.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
À utiliser ConditionStep
avec des @step
marches décorées
Pipelines prend en charge une ConditionStep
classe qui évalue les résultats des étapes précédentes pour décider de l'action à entreprendre dans le pipeline. Vous pouvez également l'utiliser ConditionStep
avec un @step
marchepied décoré. Pour utiliser le résultat de n'importe quelle étape @step
décorée avecConditionStep
, entrez le résultat de cette étape en tant qu'argument deConditionStep
. Dans l'exemple suivant, l'étape de condition reçoit le résultat de l'étape d'évaluation du modèle @step
-decorated.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Définir un pipeline à l'aide du DelayedReturn
résultat des étapes
Vous définissez un pipeline de la même manière, que vous utilisiez ou non un @step
décorateur. Lorsque vous transmettez une DelayedReturn
instance à votre pipeline, il n'est pas nécessaire de passer la liste complète des étapes pour créer le pipeline. Il déduit SDK automatiquement les étapes précédentes en fonction des dépendances que vous définissez. Toutes les étapes précédentes des Step
objets que vous avez passés au pipeline ou aux DelayedReturn
objets sont incluses dans le graphique du pipeline. Dans l'exemple suivant, le pipeline reçoit l'DelayedReturn
objet de la train
fonction. SageMaker ajoute l'preprocess
étape, en tant qu'étape précédente detrain
, au graphe de pipeline.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
S'il n'existe aucune donnée ou dépendance personnalisée entre les étapes et que vous exécutez plusieurs étapes en parallèle, le graphe du pipeline comporte plusieurs nœuds foliaires. Transmettez tous ces nœuds foliaires d'une liste à l'steps
argument de votre définition de pipeline, comme indiqué dans l'exemple suivant :
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
Lorsque le pipeline fonctionne, les deux étapes s'exécutent en parallèle.
Vous transmettez uniquement les nœuds foliaires du graphe au pipeline, car ils contiennent des informations sur toutes les étapes précédentes définies par le biais de données ou de dépendances personnalisées. Lorsqu'il compile le pipeline, il SageMaker déduit également toutes les étapes suivantes qui forment le graphe du pipeline et ajoute chacune d'elles en tant qu'étape distincte au pipeline.
Crée un pipeline.
Créez un pipeline en appelantpipeline.create()
, comme indiqué dans l'extrait suivant. Pour plus de détailscreate()
, voir SageMaker.Workflow.Pipeline.Pipeline.Create
role = "
pipeline-role
" pipeline.create(role)
Lorsque vous appelezpipeline.create()
, SageMaker compile toutes les étapes définies dans le cadre de l'instance de pipeline. SageMaker télécharge la fonction sérialisée, les arguments et tous les autres artefacts liés à l'étape sur Amazon S3.
Les données résident dans le compartiment S3 selon la structure suivante :
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
est défini dans le fichier de SageMaker configuration et s'applique à l'ensemble du pipeline. S'il n'est pas défini, le SageMaker compartiment par défaut est utilisé.
Note
Chaque fois qu'un pipeline est SageMaker compilé, il SageMaker enregistre les fonctions sérialisées, les arguments et les dépendances des étapes dans un dossier horodaté avec l'heure actuelle. Cela se produit chaque fois que vous courez pipeline.create()
pipeline.update()
, pipeline.upsert()
oupipeline.definition()
.