Créez un pipeline avec des @step fonctions décorées - Amazon SageMaker

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Créez un pipeline avec des @step fonctions décorées

Vous pouvez créer un pipeline en convertissant les fonctions Python en étapes de pipeline à l'aide du @step décorateur, en créant des dépendances entre ces fonctions pour créer un graphe de pipeline (ou un graphe acyclique dirigé (DAG)) et en transmettant les nœuds foliaires de ce graphe sous forme de liste d'étapes au pipeline. Les sections suivantes expliquent cette procédure en détail à l'aide d'exemples.

Convertir une fonction en étape

Pour créer une étape à l'aide du @step décorateur, annotez la fonction avec. @step L'exemple suivant montre une fonction @step décorée qui prétraite les données.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

Lorsque vous invoquez une fonction @step décorée, SageMaker renvoie une DelayedReturn instance au lieu d'exécuter la fonction. Une DelayedReturn instance est un proxy pour le retour réel de cette fonction. L'DelayedReturninstance peut être transmise à une autre fonction en tant qu'argument ou directement à une instance de pipeline en tant qu'étape. Pour plus d'informations sur la DelayedReturn classe, consultez sagemaker.workflow.function_step. DelayedReturn.

Lorsque vous créez une dépendance entre deux étapes, vous créez une connexion entre les étapes de votre graphe de pipeline. Les sections suivantes présentent plusieurs manières de créer une dépendance entre les étapes de votre pipeline.

Le fait de transmettre la DelayedReturn sortie d'une fonction en entrée à une autre fonction crée automatiquement une dépendance aux données dans le pipelineDAG. Dans l'exemple suivant, le transfert de la DelayedReturn sortie de la preprocess fonction à la train fonction crée une dépendance entre preprocess ettrain.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

L'exemple précédent définit une fonction d'entraînement décorée avec@step. Lorsque cette fonction est invoquée, elle reçoit le DelayedReturn résultat de l'étape du pipeline de prétraitement en entrée. L'appel de la fonction d'entraînement renvoie une autre DelayedReturn instance. Cette instance contient les informations sur toutes les étapes précédentes définies dans cette fonction (c'est-à-dire l'preprocessétape de cet exemple) qui forment le pipelineDAG.

Dans l'exemple précédent, la preprocess fonction renvoie une valeur unique. Pour les types de retour plus complexes tels que les listes ou les tuples, reportez-vous àLimites.

Dans l'exemple précédent, la train fonction a reçu le DelayedReturn résultat de preprocess et a créé une dépendance. Si vous souhaitez définir la dépendance de manière explicite sans transmettre le résultat de l'étape précédente, utilisez la add_depends_on fonction avec l'étape. Vous pouvez utiliser la get_step() fonction pour récupérer l'étape sous-jacente depuis son DelayedReturn instance, puis appeler add_depends_on _on avec la dépendance en entrée. Pour consulter la définition de la get_step() fonction, consultez sagemaker.workflow.step_outputs.get_step. L'exemple suivant montre comment créer une dépendance entre preprocess et train en utilisant get_step() etadd_depends_on().

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

Vous pouvez créer un pipeline qui inclut une étape @step décorée et une étape de pipeline traditionnelle et qui transmet des données entre elles. Par exemple, vous pouvez l'utiliser ProcessingStep pour traiter les données et transmettre le résultat à la fonction d'entraînement @step -decorated. Dans l'exemple suivant, une étape d'apprentissage @step décorée fait référence au résultat d'une étape de traitement.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

À utiliser ConditionStep avec des @step marches décorées

Pipelines prend en charge une ConditionStep classe qui évalue les résultats des étapes précédentes pour décider de l'action à entreprendre dans le pipeline. Vous pouvez également l'utiliser ConditionStep avec un @step marchepied décoré. Pour utiliser le résultat de n'importe quelle étape @step décorée avecConditionStep, entrez le résultat de cette étape en tant qu'argument deConditionStep. Dans l'exemple suivant, l'étape de condition reçoit le résultat de l'étape d'évaluation du modèle @step -decorated.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

Définir un pipeline à l'aide du DelayedReturn résultat des étapes

Vous définissez un pipeline de la même manière, que vous utilisiez ou non un @step décorateur. Lorsque vous transmettez une DelayedReturn instance à votre pipeline, il n'est pas nécessaire de passer la liste complète des étapes pour créer le pipeline. Il déduit SDK automatiquement les étapes précédentes en fonction des dépendances que vous définissez. Toutes les étapes précédentes des Step objets que vous avez passés au pipeline ou aux DelayedReturn objets sont incluses dans le graphique du pipeline. Dans l'exemple suivant, le pipeline reçoit l'DelayedReturnobjet de la train fonction. SageMaker ajoute l'preprocessétape, en tant qu'étape précédente detrain, au graphe de pipeline.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

S'il n'existe aucune donnée ou dépendance personnalisée entre les étapes et que vous exécutez plusieurs étapes en parallèle, le graphe du pipeline comporte plusieurs nœuds foliaires. Transmettez tous ces nœuds foliaires d'une liste à l'stepsargument de votre définition de pipeline, comme indiqué dans l'exemple suivant :

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

Lorsque le pipeline fonctionne, les deux étapes s'exécutent en parallèle.

Vous transmettez uniquement les nœuds foliaires du graphe au pipeline, car ils contiennent des informations sur toutes les étapes précédentes définies par le biais de données ou de dépendances personnalisées. Lorsqu'il compile le pipeline, il SageMaker déduit également toutes les étapes suivantes qui forment le graphe du pipeline et ajoute chacune d'elles en tant qu'étape distincte au pipeline.

Crée un pipeline.

Créez un pipeline en appelantpipeline.create(), comme indiqué dans l'extrait suivant. Pour plus de détailscreate(), voir SageMaker.Workflow.Pipeline.Pipeline.Create.

role = "pipeline-role" pipeline.create(role)

Lorsque vous appelezpipeline.create(), SageMaker compile toutes les étapes définies dans le cadre de l'instance de pipeline. SageMaker télécharge la fonction sérialisée, les arguments et tous les autres artefacts liés à l'étape sur Amazon S3.

Les données résident dans le compartiment S3 selon la structure suivante :

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_uriest défini dans le fichier de SageMaker configuration et s'applique à l'ensemble du pipeline. S'il n'est pas défini, le SageMaker compartiment par défaut est utilisé.

Note

Chaque fois qu'un pipeline est SageMaker compilé, il SageMaker enregistre les fonctions sérialisées, les arguments et les dépendances des étapes dans un dossier horodaté avec l'heure actuelle. Cela se produit chaque fois que vous courez pipeline.create()pipeline.update(), pipeline.upsert() oupipeline.definition().