Schritte zur Pipeline - Amazon SageMaker

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Schritte zur Pipeline

SageMaker Pipelines bestehen aus Stufen. Diese Schritte definieren die Aktionen, die die Pipeline ausführt, und die Beziehungen zwischen den Schritten mithilfe von Eigenschaften.

Schritttypen

Im Folgenden werden die Anforderungen der einzelnen Schrittarten beschrieben und ein Beispiel für die Implementierung des Schritts bereitgestellt. Dies sind keine funktionalen Implementierungen, da sie nicht die benötigten Ressourcen und Eingaben bereitstellen. Ein Tutorial zur Implementierung dieser Schritte finden Sie unter SageMaker Pipelines erstellen und verwalten.

Amazon SageMaker Model Building Pipelines unterstützen die folgenden Schritttypen:

Verarbeitungsschritt

Verwenden Sie einen Verarbeitungsschritt, um einen Verarbeitungsauftrag für die Datenverarbeitung zu erstellen. Weitere Informationen zur Verarbeitung von Aufträgen finden Sie unter Daten verarbeiten und Modelle auswerten.

Ein Verarbeitungsschritt erfordert einen Prozessor, ein Python-Skript, das den Verarbeitungscode, die Ausgaben für die Verarbeitung und die Jobargumente definiert. Im folgenden Beispiel wird gezeigt, wie Sie eineProcessingStep Definition erstellen.

from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )

Laufzeitparameter übergeben

Das folgende Beispiel zeigt, wie Laufzeitparameter von einem PySpark Prozessor an einen übergebenProcessingStep werden.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )

Weitere Informationen zu den Anforderungen an die Verarbeitungsschritte finden Sie unter sagemaker.workflow.steps. ProcessingStepdokumentation. Ein ausführliches Beispiel finden Sie unter Definieren Sie einen Verarbeitungsschritt für Feature Engineering im Beispielnotizbuch Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines.

Schritt für Schritt

Sie verwenden einen Trainingsschritt, um einen Trainingsjob zum Trainieren eines Modells zu erstellen. Weitere Informationen zu Trainingsjobs finden Sie unter Train a Model with Amazon SageMaker.

Ein Trainingsschritt erfordert einen Schätzer sowie Eingaben von Trainings- und Validierungsdaten. Im folgenden Beispiel wird gezeigt, wie Sie eineTrainingStep Definition erstellen. Weitere Informationen zu den Anforderungen an die Trainingsschritte finden Sie unter sagemaker.workflow.steps. TrainingStepdokumentation.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )

Optimierungsschritt

Sie verwenden einen Optimierungsschritt, um einen Hyperparameter-Optimierungsjob zu erstellen, der auch als Hyperparameter-Optimierung (HPO) bezeichnet wird. Bei einem Hyperparameter-Tuning-Job werden mehrere Trainingsjobs ausgeführt, von denen jeder eine Modellversion erzeugt. Weitere Informationen zur Optimierung von Hyperparametern finden Sie unterFühren Sie eine automatische Modelloptimierung durch mit SageMaker.

Der Tuning-Job ist mit dem SageMaker Experiment für die Pipeline verknüpft, wobei die Trainingsjobs als Versuche geschaffen wurden. Weitere Informationen finden Sie unter Integration von Experimenten.

Ein Tuning-Schritt erfordert einen HyperparameterTunerund Trainingseingaben. Sie können vorherige Tuning-Jobs erneut trainieren, indem Sie denwarm_start_config Parameter von angebenHyperparameterTuner. Weitere Informationen zur Optimierung von Hyperparametern und zum Warmstart finden Sie unterDurchführen eines Hyperparameter-Optimierungsauftrags mit Warmstart.

Sie verwenden die Methode get_top_model_s3_uri der sagemaker.workflow.steps. TuningStepKlasse, um das Modellartefakt aus einer der leistungsstärksten Modellversionen zu erhalten. Ein Notizbuch, das zeigt, wie ein Tuning-Schritt in einer SageMaker Pipeline verwendet wird, finden Sie unter sagemaker-pipelines-tuning-step.ipynb.

Wichtig

Optimierungsschritte wurden in Amazon SageMaker Python SDK v2.48.0 und Amazon SageMaker Studio v3.8.0 eingeführt. Sie müssen Studio aktualisieren, bevor Sie einen Optimierungsschritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio finden Sie unter SageMaker Studio herunterfahren und aktualisieren.

Im folgenden Beispiel wird gezeigt, wie Sie eineTuningStep Definition erstellen.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://my-bucket/my-data")) )

Holen Sie sich die beste Modellversion

Im folgenden Beispiel wird gezeigt, wie Sie mitget_top_model_s3_uri dieser Methode die beste Modellversion aus dem Tuning-Job herausholen. Maximal sind die 50 leistungsstärksten Versionen verfügbar, geordnet nach HyperParameterTuningJobObjective. Dastop_k Argument ist ein Index der Versionen, wobeitop_k=0 es sich um die Version mit der besten Leistung und um die Version mit der schlechtesten Leistungtop_k=49 handelt.

best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )

Weitere Informationen zu den Anforderungen für Optimierungsschritte finden Sie unter sagemaker.workflow.steps. TuningStepdokumentation.

AutoML-Schritt

Verwenden Sie die AutoML-API, um einen AutoML-Job zu erstellen, um ein Modell automatisch zu trainieren. Weitere Informationen zu AutoML-Jobs finden Sie unter Automatisieren der Modellentwicklung mit Amazon SageMaker Autopilot.

Anmerkung

Derzeit unterstützt der AutoML-Schritt nur den Ensembling-Trainingsmodus.

Im folgenden Beispiel wird gezeigt, wie eine Definition erstellt wird mitAutoMLStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="<role>", target_attribute_name="my_target_attribute_name", mode="ENSEMBLING", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://my-bucket/my-training-data", target_attribute_name="my_target_attribute_name", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://my-bucket/my-validation-data", target_attribute_name="my_target_attribute_name", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )

Holen Sie sich die beste Modellversion

Der AutoML-Schritt trainiert automatisch mehrere Modellkandidaten. Sie können das Modell mit der besten Zielmetrik aus dem AutoML-Job abrufen, indem Sie dieget_best_auto_ml_model Methode und einen IAM für denrole Zugriff auf Modellartefakte wie folgt verwenden.

best_model = step_automl.get_best_auto_ml_model(role=<role>)

Weitere Informationen finden Sie unter AutoML-Schritt im SageMaker Python-SDK.

Modell Step

Verwenden Sie aModelStep, um ein SageMaker Modell zu erstellen oder zu registrieren. Weitere Informationen zu denModelStep Anforderungen finden Sie unter sagemaker.workflow.model_step. ModelStepdokumentation.

Erstellen eines Modells

Sie können ein verwendenModelStep, um ein SageMaker Modell zu erstellen. AModelStep benötigt Modellartefakte und Informationen über den SageMaker Instanztyp, den Sie zum Erstellen des Modells verwenden müssen. Weitere Informationen zu SageMaker Modellen finden Sie unter Train a Model with Amazon SageMaker.

Im folgenden Beispiel wird gezeigt, wie Sie eineModelStep Definition erstellen.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )

Registrieren eines Modells

Sie können a verwendenModelStep, um asagemaker.model.Model oder asagemaker.pipeline.PipelineModel bei der SageMaker Amazon-Modellregistrierung zu registrieren. APipelineModel steht für eine Inferenzpipeline, bei der es sich um ein Modell handelt, das aus einer linearen Abfolge von Containern besteht, die Inferenzanfragen verarbeiten. Weitere Informationen über die Registrierung eines Modells finden Sie unterModelle mit Model Registry registrieren und bereitstellen.

Das folgende Beispiel zeigt, wie ein erstellt wirdModelStep, das a registriertPipelineModel.

import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )

CreateModel Schritt

Wichtig

Wir empfehlenModell Step die Verwendung zum Erstellen von Modellen ab Version 2.90.0 des SageMaker Python SDK. CreateModelStepfunktioniert weiterhin in früheren Versionen des SageMaker Python SDK, wird aber nicht mehr aktiv unterstützt.

Sie verwenden einenCreateModel Schritt, um ein SageMaker Modell zu erstellen. Weitere Informationen zu SageMaker Modellen finden Sie unter Train a Model with Amazon SageMaker.

Ein Schritt zum Erstellen eines Modells erfordert Modellartefakte und Informationen über den SageMaker Instanztyp, den Sie zum Erstellen des Modells verwenden müssen. Im folgenden Beispiel wird gezeigt, wie eineCreateModel Schrittdefinition erstellt wird. Weitere Informationen zu denCreateModel Schrittanforderungen finden Sie unter sagemaker.workflow.steps. CreateModelStepdokumentation.

from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )

RegisterModel Schritt

Wichtig

Wir empfehlenModell Step, Modelle ab Version 2.90.0 des SageMaker Python SDK zu registrieren. RegisterModelfunktioniert weiterhin in früheren Versionen des SageMaker Python SDK, wird aber nicht mehr aktiv unterstützt.

Sie verwenden einenRegisterModel Schritt, um ein SageMaker.Model.Model oder eine Sagemaker.pipeline zu registrieren. PipelineModelmit der SageMaker Amazon-Modellregistrierung. APipelineModel steht für eine Inferenzpipeline, bei der es sich um ein Modell handelt, das aus einer linearen Abfolge von Containern besteht, die Inferenzanfragen verarbeiten.

Weitere Informationen über die Registrierung eines Modells finden Sie unterModelle mit Model Registry registrieren und bereitstellen. Weitere Informationen zu denRegisterModel Schrittanforderungen finden Sie in den sagemaker.workflow.step_collections. RegisterModeldokumentation.

Das folgende Beispiel zeigt, wie einRegisterModel Schritt erstellt wird, der eine registriertPipelineModel.

import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', )

Wennmodel nicht angegeben, benötigt der Registermodellschritt einen Schätzer, wie im folgenden Beispiel gezeigt.

from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

Transformationsschritt

Sie verwenden einen Transformationsschritt für die Batch-Transformation, um die Inferenz für einen gesamten Datensatz auszuführen. Weitere Hinweise zur Stapeltransformation finden Sie unterAusführen von Stapeltransformationen mit Inferenz-Pipelines.

Ein Transformationsschritt erfordert einen Transformator und die Daten, für die die Batch-Transformation ausgeführt werden soll. Im folgenden Beispiel wird gezeigt, wie eineTransform Schrittdefinition erstellt wird. Weitere Informationen zu denTransform Schrittanforderungen finden Sie unter sagemaker.workflow.steps. TransformStepdokumentation.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://my-bucket/my-data"), )

Bedingungsschritt

Sie verwenden einen Bedingungsschritt, um den Zustand der Schritteigenschaften zu bewerten und zu beurteilen, welche Aktion als Nächstes in der Pipeline ergriffen werden sollte.

Ein Bedingungsschritt erfordert eine Liste von Bedingungen, eine Liste von Schritten, die ausgeführt werden müssen, wenn die Bedingung erfüllt isttrue, und eine Liste von Schritten, die ausgeführt werden müssen, wenn die Bedingung entsprechend ausgewertet wirdfalse. Im folgenden Beispiel wird gezeigt, wie Sie eineConditionStep Definition erstellen.

Einschränkungen

  • SageMaker Pipelines unterstützt die Verwendung verschachtelter Bedingungsschritte nicht. Sie können einen Bedingungsschritt nicht als Eingabe für einen anderen Bedingungsschritt übergeben.

  • Ein Bedingungsschritt kann nicht in beiden Zweigen identische Schritte verwenden. Wenn Sie in beiden Zweigen dieselbe Schrittfunktionalität benötigen, duplizieren Sie den Schritt und geben Sie ihm einen anderen Namen.

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ( ConditionStep, JsonGet ) cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )

Weitere Informationen zu denConditionStep Anforderungen finden Sie im sagemaker.workflow.condition_step. ConditionStepAPI-Referenz. Weitere Informationen zu den unterstützten Bedingungen finden Sie unter Amazon SageMaker Model Building Pipelines — Conditions in der SageMaker Python-SDK-Dokumentation.

Rückruf-Schritt

Sie verwenden einenCallback Schritt, um zusätzliche Prozesse undAWS Services in Ihren Workflow zu integrieren, die nicht direkt von Amazon SageMaker Model Building Pipelines bereitgestellt werden. Wenn einCallback Schritt ausgeführt wird, wird das folgende Verfahren ausgeführt:

  • SageMaker Pipelines sendet eine Nachricht an eine vom Kunden angegebene Amazon Simple Queue Service (Amazon SQS) -Warteschlange. Die Nachricht enthält ein von SageMaker Pipelines generiertes Token und eine vom Kunden bereitgestellte Liste von Eingabeparametern. Nach dem Senden der Nachricht wartet SageMaker Pipelines auf eine Antwort des Kunden.

  • Der Kunde ruft die Nachricht aus der Amazon SQS-Warteschlange ab und startet seinen benutzerdefinierten Prozess.

  • Wenn der Prozess abgeschlossen ist, ruft der Kunde eine der folgenden APIs auf und übermittelt das von den SageMaker Pipelines generierte Token:

  • Der API-Aufruf bewirkt, dass SageMaker Pipelines entweder den Pipeline-Prozess fortsetzen oder den Prozess nicht bestehen lassen.

Weitere Informationen zu denCallback Schrittanforderungen finden Sie im sagemaker.workflow.callback_step. CallbackStepdokumentation. Eine vollständige Lösung finden Sie unter Erweitern von SageMaker Pipelines um benutzerdefinierte Schritte mithilfe von Callback-Schritten.

Wichtig

CallbackSchritte wurden in Amazon SageMaker Python SDK v2.45.0 und Amazon SageMaker Studio v3.6.2 eingeführt. Sie müssen Studio aktualisieren, bevor Sie einenCallback Schritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio finden Sie unter SageMaker Studio herunterfahren und aktualisieren.

Das folgende Beispiel zeigt eine Implementierung des vorherigen Verfahrens.

from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
Anmerkung

Die Ausgabeparameter fürCallbackStep sollten nicht verschachtelt sein. Wenn Sie beispielsweise ein verschachteltes Wörterbuch als Ausgabeparameter verwenden, wird das Wörterbuch als einzelne Zeichenfolge behandelt (z. B. {"output1": "{\"nested_output1\":\"my-output\"}"}). Wenn Sie einen verschachtelten Wert angeben, wird beim Versuch, auf einen bestimmten Ausgabeparameter zu verweisen, ein Client-Fehler ausgelöst, der nicht wiederholt werden kann.

Verhalten stoppen

Ein Pipeline-Prozess stoppt nicht, während einCallback Schritt ausgeführt wird.

Wenn Sie einen Pipeline-Prozess mit einem laufendenCallback Schritt aufrufen StopPipelineExecution, sendet SageMaker Pipelines eine zusätzliche Amazon SQS-Nachricht an die angegebene SQS-Warteschlange. Der Text der SQS-Nachricht enthält ein Statusfeld, das auf gesetzt istStopping. Im Folgenden sehen Sie ein Beispiel für einen SQS-Nachrichtentext.

{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }

Sie sollten Ihrem Amazon SQS-Nachrichtenkonsumer Logik hinzufügen, um nach Erhalt der Nachricht alle erforderlichen Maßnahmen zu ergreifen (z. B. Ressourcenbereinigung), gefolgt von einem Anruf anSendPipelineExecutionStepSuccess oderSendPipelineExecutionStepFailure.

Erst wenn SageMaker Pipelines einen dieser Aufrufe erhält, stoppt es den Pipeline-Prozess.

Lambda-Schritt

Sie verwenden einen Lambda-Schritt, um eineAWS Lambda Funktion auszuführen. Sie können eine vorhandene Lambda-Funktion ausführen oder SageMaker eine neue Lambda-Funktion erstellen und ausführen. Ein Notizbuch, das zeigt, wie ein Lambda-Schritt in einer SageMaker Pipeline verwendet wird, finden Sie unter sagemaker-pipelines-lambda-step.ipynb.

Wichtig

Lambda-Schritte wurden in Amazon SageMaker Python SDK v2.51.0 und Amazon SageMaker Studio v3.9.1 eingeführt. Sie müssen Studio aktualisieren, bevor Sie einen Lambda-Schritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio finden Sie unter SageMaker Studio herunterfahren und aktualisieren.

SageMaker stellt die SageMaker.Lambda_Helper.Lambda-Klasse zum Erstellen, Aktualisieren, Aufrufen und Löschen von Lambda-Funktionen bereit. Lambdahat die folgende Signatur.

Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )

Der sagemaker.workflow.lambda_step. LambdaStepDie Klasse hat einlambda_func Argument vom TypLambda. Um eine vorhandene Lambda-Funktion aufzurufen, müssen Sie lediglich den Amazon-Ressourcennamen (ARN) der Funktion angebenfunction_arn. Wenn Sie keinen Wert für angebenfunction_arnhandler, müssen Sie eine der folgenden Optionen angeben:

  • zipped_code_dir— Der Pfad der gezippten Lambda-Funktion

    s3_bucket— Amazon S3-Bucket, wo hochgeladen werdenzipped_code_dir soll

  • script— Der Pfad der Lambda-Funktionsskriptdatei

Das folgende Beispiel zeigt, wie eineLambda Schrittdefinition erstellt wird, die eine vorhandene Lambda-Funktion aufruft.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

Das folgende Beispiel zeigt, wie Sie eineLambda Schrittdefinition erstellen, die mithilfe eines Lambda-Funktionsskripts eine Lambda-Funktion erstellt und aufruft.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

Eingänge und Ausgänge

Wenn IhreLambda Funktion Eingänge oder Ausgänge hat, müssen diese ebenfalls in IhremLambda Schritt definiert werden.

Anmerkung

Eingabe- und Ausgabeparameter sollten nicht verschachtelt sein. Wenn Sie beispielsweise ein verschachteltes Wörterbuch als Ausgabeparameter verwenden, wird das Wörterbuch als einzelne Zeichenfolge behandelt (z. B. {"output1": "{\"nested_output1\":\"my-output\"}"}). Wenn Sie einen verschachtelten Wert angeben und später versuchen, darauf zu verweisen, wird ein Client-Fehler ausgegeben, der nicht erneut versucht werden kann.

Bei der Definition desLambda Schrittsinputs muss es sich um ein Wörterbuch mit Schlüssel-Wert-Paaren handeln. Jeder Wert desinputs Wörterbuchs muss ein primitiver Typ sein (Zeichenfolge, Ganzzahl oder Float). Verschachtelte Objekte werden nicht unterstützt. Wenn der Wert undefiniert bleibt, wird standardmäßig derinputs Wert auf gesetztNone.

Deroutputs Wert muss eine Liste von Schlüsseln sein. Diese Schlüssel beziehen sich auf ein Wörterbuch, das in der Ausgabe derLambda Funktion definiert ist. Beiinputs diesen Schlüsseln muss es sich beispielsweise um primitive Typen handeln, und verschachtelte Objekte werden nicht unterstützt.

Timeout und Stoppverhalten

DieLambda Klasse hat eintimeout Argument, das die maximale Zeit angibt, für die die Lambda-Funktion ausgeführt werden kann. Der Standardwert ist 120 Sekunden mit einem Höchstwert von 10 Minuten. Wenn die Lambda-Funktion ausgeführt wird, wenn das Timeout erreicht ist, schlägt der Lambda-Schritt fehl. Die Lambda-Funktion wird jedoch weiter ausgeführt.

Ein Pipeline-Prozess kann nicht gestoppt werden, während ein Lambda-Schritt ausgeführt wird, da die durch den Lambda-Schritt aufgerufene Lambda-Funktion nicht gestoppt werden kann. Wenn Sie versuchen, den Prozess zu stoppen, während die Lambda-Funktion ausgeführt wird, wartet die Pipeline, bis die Lambda-Funktion beendet ist oder bis das Timeout erreicht wird, je nachdem, was zuerst eintritt, und stoppt dann. Wenn die Lambda-Funktion abgeschlossen ist, lautet der Status des Pipeline-ProzessesStopped. Wenn das Timeout erreicht wird, lautet der Status des Pipeline-ProzessesFailed.

ClarifyCheck Schritt

Sie können diesenClarifyCheck Schritt verwenden, um Abweichungen von Ausgangslinien anhand früherer Basislinien zu überprüfen, um Verzerrungsanalysen und die Erklärbarkeit des Modells zu gewährleisten. Anschließend können Sie Ihre Baselines mit dermodel.register() Methode generieren und registrieren und die Ausgabe dieser Methode anModell Step using übergebenstep_args. Diese Baselines für die Drift-Prüfung können von Amazon SageMaker Model Monitor für Ihre Modellendpunkte verwendet werden, sodass Sie nicht separat einen Basislinienvorschlag erstellen müssen. In diesemClarifyCheck Schritt können auch Basiswerte für die Drift-Prüfung aus der Modellregistrierung abgerufen werden. DieserClarifyCheck Schritt nutzt den vorgefertigten Container von SageMaker Amazon Clarify, der eine Reihe von Funktionen zur Modellüberwachung bietet, darunter Vorschläge für Einschränkungen und die Überprüfung von Einschränkungen anhand einer bestimmten Basislinie. Weitere Informationen finden Sie unter Erste Schritte mit einem SageMaker Clarify Container.

Konfiguration des ClarifyCheck Schritts

Sie können denClarifyCheck Schritt so konfigurieren, dass bei jeder Verwendung in einer Pipeline nur einer der folgenden Prüftypen ausgeführt wird.

  • Überprüfung der Datenverzerrung

  • Überprüfung der Modellabweichung

  • Überprüfung der Erklärbarkeit des Modells

Sie tun dies, indem Sie denclarify_check_config Parameter mit einem der folgenden Prüftypwerte setzen:

  • DataBiasCheckConfig

  • ModelBiasCheckConfig

  • ModelExplainabilityCheckConfig

Mit diesemClarifyCheck Schritt wird ein Verarbeitungsjob gestartet, der den SageMaker vordefinierten Clarificy-Container ausführt und spezielle Konfigurationen für den Prüf- und Verarbeitungsjob erfordert. ClarifyCheckConfigundCheckJobConfig sind Hilfsfunktionen für diese Konfigurationen, die darauf abgestimmt sind, wie der SageMaker Clarificy-Verarbeitungsauftrag zur Überprüfung von Modellverzerrungen, Datenverzerrungen oder Modellerklärbarkeit berechnet wird. Weitere Informationen finden Sie unter Führen Sie SageMaker Clarify Processing-Jobs für Verzerrungsanalysen und Erklärbarkeit aus.

Steuerung des Schrittverhaltens für den Driftcheck

Für diesenClarifyCheck Schritt sind die folgenden zwei booleschen Flags erforderlich, um sein Verhalten zu steuern:

  • skip_check: Dieser Parameter gibt an, ob die Driftprüfung mit der vorherigen Basislinie übersprungen wird oder nicht. Wenn sie auf gesetzt istFalse, muss die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein.

  • register_new_baseline: Dieser Parameter gibt an, ob über die Step-Eigenschaft auf eine neu berechnete Basislinie zugegriffen werden kannBaselineUsedForDriftCheckConstraints. Wenn sie auf gesetzt istFalse, muss auch die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein. Dieser ist über dieBaselineUsedForDriftCheckConstraints Unterkunft zugänglich.

Weitere Informationen finden Sie unter Basisberechnung, Drifterkennung und Lebenszyklus mit ClarifyCheck und QualityCheck Schritten in Amazon SageMaker Model Building Pipelines.

Arbeiten mit Baselines

Sie können optional den angebenmodel_package_group_name, um den vorhandenen Basisplan zu finden, und derClarifyCheck Schritt ruft dasDriftCheckBaselines zuletzt genehmigte Modellpaket in der Modellpaketgruppe ab. Oder Sie können über densupplied_baseline_constraints Parameter eine vorherige Basislinie angeben. Wenn Siemodel_package_group_name sowohl den als auch den angebensupplied_baseline_constraints, verwendet derClarifyCheck Schritt die durch densupplied_baseline_constraints Parameter angegebene Basislinie.

Weitere Informationen zur Verwendung derClarifyCheck Schrittanforderungen finden Sie unter sagemaker.workflow.steps. ClarifyCheckStepim Amazon SageMaker SageMaker SDK für Python. Ein Amazon SageMaker Studio-Notizbuch, das zeigt, wieClarifyCheck Step in SageMaker Pipelines verwendet wird, finden Sie unter sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Beispiel Erstellen Sie einenClarifyCheck Schritt für die Überprüfung von Datenverzerrungen
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']), label=0, dataset_type="text/csv", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json", model_package_group_name="MyModelPackageGroup" )

QualityCheck Schritt

Sie können diesenQualityCheck Schritt verwenden, um Vorschläge für Basispläne und Abgleichsprüfungen mit einem früheren Basisplan für die Daten- oder Modellqualität in einer Pipeline durchzuführen. Anschließend können Sie Ihre Baselines mit dermodel.register() Methode generieren und registrieren und die Ausgabe dieser Methode anModell Step using übergebenstep_args. Model Monitor kann diese Basislinien für die Driftprüfung Ihrer Modellendpunkte verwenden, sodass Sie nicht separat einen Basislinienvorschlag erstellen müssen. In diesemQualityCheck Schritt können auch Basiswerte für die Drift-Prüfung aus der Modellregistrierung abgerufen werden. DieserQualityCheck Schritt nutzt den vorgefertigten Container von Amazon SageMaker Model Monitor, der über eine Reihe von Funktionen zur Modellüberwachung verfügt, darunter Vorschläge für Einschränkungen, Statistikgenerierung und Überprüfung von Einschränkungen anhand einer Basislinie. Weitere Informationen finden Sie unter Vorgefertigter Container von Amazon SageMaker Model Monitor.

Konfiguration des QualityCheck Schritts

Sie können denQualityCheck Schritt so konfigurieren, dass bei jeder Verwendung in einer Pipeline nur einer der folgenden Prüftypen ausgeführt wird.

  • Datenqualitätsprüfung

  • Qualitätskontrolle des Modells

Sie tun dies, indem Sie denquality_check_config Parameter mit einem der folgenden Prüftypwerte setzen:

  • DataQualityCheckConfig

  • ModelQualityCheckConfig

Mit diesemQualityCheck Schritt wird ein Verarbeitungsjob gestartet, der den vordefinierten Model Monitor-Container ausführt und spezielle Konfigurationen für den Prüf- und Verarbeitungsjob erfordert. DieQualityCheckConfig undCheckJobConfig sind Hilfsfunktionen für diese Konfigurationen, die darauf abgestimmt sind, wie Model Monitor eine Basislinie für die Modellqualitäts- oder Datenqualitätsüberwachung erstellt. Weitere Informationen zu den Modellmonitor-Basisvorschlägen finden Sie unterErstellen einer Baseline undErstellen Sie einen Basisplan für die Modellqualität.

Steuerung des Schrittverhaltens für den Driftcheck

Für diesenQualityCheck Schritt sind die folgenden beiden booleschen Flags erforderlich, um sein Verhalten zu steuern:

  • skip_check: Dieser Parameter gibt an, ob die Driftprüfung mit der vorherigen Basislinie übersprungen wird oder nicht. Wenn sie auf gesetzt istFalse, muss die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein.

  • register_new_baseline: Dieser Parameter gibt an, ob über die SchritteigenschaftenBaselineUsedForDriftCheckConstraints und auf eine neu berechnete Basislinie zugegriffen werden kannBaselineUsedForDriftCheckStatistics. Wenn sie auf gesetzt istFalse, muss auch die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein. Auf diese kann über dieBaselineUsedForDriftCheckStatistics EigenschaftenBaselineUsedForDriftCheckConstraints und zugegriffen werden.

Weitere Informationen finden Sie unter Basisberechnung, Drifterkennung und Lebenszyklus mit ClarifyCheck und QualityCheck Schritten in Amazon SageMaker Model Building Pipelines.

Arbeiten mit Baselines

Sie können eine vorherige Basislinie direkt über diesupplied_baseline_constraints Parametersupplied_baseline_statistics und angeben, oder Sie können einfach den angeben,model_package_group_name und derQualityCheck Schritt ruft dasDriftCheckBaselines zuletzt genehmigte Modellpaket in der Modellpaketgruppe ab. Wenn Sie denmodel_package_group_name, den und angebensupplied_baseline_constraints, verwendet derQualityCheck Schritt den Basisplansupplied_baseline_statistics, der durchsupplied_baseline_constraints undsupplied_baseline_statistics auf dem Prüftyp des von Ihnen ausgeführtenQualityCheck Schritts angegeben ist.

Weitere Informationen zur Verwendung derQualityCheck Schrittanforderungen finden Sie unter sagemaker.workflow.steps. QualityCheckStepim Amazon SageMaker SageMaker SDK für Python. Ein Amazon SageMaker Studio-Notizbuch, das zeigt, wieQualityCheck Step in SageMaker Pipelines verwendet wird, finden Sie unter sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Beispiel Erstellen Sie einenQualityCheck Schritt für die Datenqualitätsprüfung
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json", model_package_group_name="MyModelPackageGroup" )

EMR-Schritt

Sie können den EMR-Schritt von Amazon SageMaker Model Building Pipelines verwenden, um Amazon EMR-Schritte auf einem laufenden Amazon EMR-Cluster zu verarbeiten, oder die Pipeline einen Amazon EMR-Cluster für Sie erstellen und verwalten lassen. Weitere Informationen über Amazon EMR finden Sie unter Erste Schritte mit Amazon EMR.

Für den EMR-SchrittEMRStepConfig sind der Speicherort der JAR-Datei, die vom Amazon EMR-Cluster verwendet werden soll, sowie alle zu übergebenden Argumente erforderlich. Sie geben auch die Amazon EMR-Cluster-ID an, wenn Sie den Schritt auf einem laufenden EMR-Cluster ausführen möchten, oder die Cluster-Konfiguration, wenn der EMR-Schritt auf einem Cluster ausgeführt werden soll, den er für Sie erstellt, verwaltet und beendet. Die folgenden Abschnitte enthalten Beispiele und Links zu Beispielnotizbüchern, in denen beide Methoden demonstriert werden.

Anmerkung
  • EMR-Schritte erfordern, dass die an Ihre Pipeline übergebene Rolle über zusätzliche Berechtigungen verfügt. Sie sollten die AWSverwaltete Richtlinie:AmazonSageMakerPipelinesIntegrations an Ihre Pipeline-Rolle anhängen oder sicherstellen, dass die Rolle die in dieser Richtlinie enthaltenen Berechtigungen enthält.

  • EMR Step wird weder für EMR Serverless noch für Amazon EMR on EKS unterstützt.

  • Wenn Sie einen EMR-Schritt auf einem laufenden Cluster verarbeiten, können Sie nur einen Cluster verwenden, der sich in einem der folgenden Zustände befindet:STARTINGBOOTSTRAPPING,RUNNING, oderWAITING.

  • Wenn Sie EMR-Schritte in einem laufenden Cluster verarbeiten, können Sie in einemPENDING Zustand in einem EMR-Cluster höchstens 256 EMR-Schritte haben. EMR-Schritte, die über diesen Grenzwert hinausgehen, führen zu einem Fehler bei der Pipeline-Ausführung. Sie können erwägen, es zu verwendenRichtlinie für Wiederholungsversuche für Pipeline-Schritte.

  • Sie können entweder die Cluster-ID oder die Clusterkonfiguration angeben, aber nicht beides.

  • Der EMR-Schritt stützt sich darauf EventBridge , dass Amazon Änderungen im EMR-Schritt oder Clusterstatus überwacht. Wenn Sie Ihren Amazon EMR-Job in einem laufenden Cluster verarbeiten, verwendet der EMR-Schritt dieSageMakerPipelineExecutionEMRStepStatusUpdateRule Regel, um den Status des EMR-Schritts zu überwachen. Wenn Sie Ihren Job in einem Cluster verarbeiten, den der EMR-Schritt für Sie erstellt, verwendet der Schritt dieSageMakerPipelineExecutionEMRClusterStatusRule Regel, um Änderungen des Clusterstatus zu überwachen. Wenn Sie eine dieser EventBridge Regeln in IhremAWS Konto sehen, löschen Sie sie nicht, da Ihr EMR-Schritt sonst möglicherweise nicht abgeschlossen wird.

Starten Sie einen neuen Job in einem laufenden Amazon EMR-Cluster

Wenn Sie einen neuen Job auf einem laufenden Amazon EMR-Cluster starten möchten, übergeben Sie die Cluster-ID als Zeichenfolge an dascluster_id Argument vonEMRStep. Das folgende Beispiel illustriert diese Vorgehensweise.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )

Ein Beispielnotizbuch, das Sie durch ein vollständiges Beispiel führt, finden Sie unter SageMaker Pipelines EMR Step With Running EMR Cluster.

Starten Sie einen neuen Job in einem neuen Amazon EMR-Cluster

Wenn Sie einen neuen Job in einem neuen Cluster starten möchten, der für SieEMRStep erstellt wird, geben Sie Ihre Clusterkonfiguration als Wörterbuch mit derselben Struktur wie eine RunJobFlowAnfrage an. Nehmen Sie die folgenden Felder jedoch nicht in Ihre Clusterkonfiguration auf:

  • [Name]

  • [Steps]

  • [AutoTerminationPolicy]

  • [Instances][KeepJobFlowAliveWhenNoSteps]

  • [Instances][TerminationProtected]

Alle anderenRunJobFlow Argumente sind für die Verwendung in Ihrer Clusterkonfiguration verfügbar. Einzelheiten zur Anforderungssyntax finden Sie unter RunJobFlow.

Im folgenden Beispiel wird eine Clusterkonfiguration an eine EMR-Schrittdefinition übergeben, wodurch der Schritt aufgefordert wird, einen neuen Job auf einem neuen EMR-Cluster zu starten. Die EMR-Clusterkonfiguration in diesem Beispiel umfasst Spezifikationen für primäre und zentrale EMR-Clusterknoten. Weitere Informationen zu Amazon EMR-Knotentypen finden Sie unter Grundlegendes zu Knotentypen: Primär-, Kern- und Aufgabenknoten.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role", "ServiceRole": "service-role" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )

Ein Beispielnotizbuch, das Sie durch ein vollständiges Beispiel führt, finden Sie unter SageMaker Pipelines EMR Step With Cluster Lifecycle Management.

Fehlgeschlagener Schritt

Sie verwenden aFailStep, um die Ausführung einer Amazon SageMaker Model Building Pipelines zu beenden, wenn eine gewünschte Bedingung oder ein gewünschter Zustand nicht erreicht ist, und um die Ausführung dieser Pipeline als fehlgeschlagen zu markieren. FailStepAußerdem können Sie eine benutzerdefinierte Fehlermeldung eingeben, die die Ursache für den Ausführungsfehler der Pipeline angibt.

Anmerkung

Wenn ein Pipeline-SchrittFailStep und andere Pipeline-Schritte gleichzeitig ausgeführt werden, wird die Pipeline erst beendet, wenn alle gleichzeitigen Schritte abgeschlossen sind.

Einschränkungen bei der VerwendungFailStep

  • Sie können derDependsOn ListeFailStep der anderen Schritte keinen hinzufügen. Weitere Informationen finden Sie unter Benutzerdefinierte Abhängigkeit zwischen den Schritten.

  • Andere Schritte können nicht auf die verweisenFailStep. Dies ist immer der letzte Schritt bei der Ausführung einer Pipeline.

  • Sie können eine Pipeline-Ausführung, die mit einem endet, nicht erneut versuchenFailStep.

Sie können dasFailStepErrorMessage in Form einer statischen Textzeichenfolge erstellen. Alternativ können Sie auch Pipeline-Parameter, einen Join-Vorgang oder andere Schritteigenschaften verwenden, um eine aussagekräftigere Fehlermeldung zu erstellen.

Der folgende Beispielcodeausschnitt verwendet einenFailStep mit einem mit Pipeline-ParameternErrorMessage konfiguriertenJoin Vorgang.

from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="AbaloneMSEFail", error_message=Join( on=" ", values=["Execution failed due to MSE >", mse_threshold_param] ), )

Eigenschaften des Schritts

Dasproperties Attribut wird verwendet, um Datenabhängigkeiten zwischen den Schritten in der Pipeline hinzuzufügen. Diese Datenabhängigkeiten werden dann von SageMaker Pipelines verwendet, um die DAG aus der Pipeline-Definition zu erstellen. Diese Eigenschaften können als Platzhalterwerte referenziert werden und werden zur Laufzeit aufgelöst.

Dasproperties Attribut eines SageMaker Pipelines-Schritts entspricht dem Objekt, das durch einenDescribe Aufruf für den entsprechenden SageMaker Jobtyp zurückgegeben wird. Für jeden Jobtyp gibt derDescribe Aufruf das folgende Antwortobjekt zurück:

Um zu überprüfen, welche Eigenschaften bei der Erstellung von Datenabhängigkeiten für jeden Schritttyp referenzierbar sind, finden Sie im Amazon SageMaker Python SDK unter Datenabhängigkeit — Eigenschaftsreferenz.

Schrittparallelität

Wenn ein Schritt von keinem anderen Schritt abhängt, wird er sofort nach der Ausführung der Pipeline ausgeführt. Die parallel Ausführung zu vieler Pipeline-Schritte kann jedoch die verfügbaren Ressourcen schnell erschöpfen. Steuern Sie die Anzahl der gleichzeitigen Schritte für eine Pipeline-Ausführung mitParallelismConfiguration.

Im folgenden Beispiel wirdParallelismConfiguration das Limit für gleichzeitige Schritte auf fünf festgelegt.

pipeline.create( parallelism_config=ParallelismConfiguration(5), )

Datenabhängigkeit zwischen den Schritten

Sie definieren die Struktur Ihrer DAG, indem Sie die Datenbeziehungen zwischen den Schritten angeben. Um Datenabhängigkeiten zwischen Schritten zu erstellen, übergeben Sie die Eigenschaften eines Schritts als Eingabe an einen anderen Schritt in der Pipeline. Der Schritt, der die Eingabe empfängt, wird erst gestartet, nachdem die Ausführung des Schritts, der die Eingabe bereitstellt, abgeschlossen ist.

Eine Datenabhängigkeit verwendet die JsonPath Notation im folgenden Format. Dieses Format durchläuft die JSON-Eigenschaftsdatei, was bedeutet, dass Sie so viele <property>Instanzen anhängen können, bis die gewünschte verschachtelte Eigenschaft in der Datei erreicht ist. Weitere Informationen zur JsonPath Notation finden Sie im JsonPath Repo.

<step_name>.properties.<property>.<property>

Im folgenden Beispiel wird gezeigt, wie Sie einen Amazon S3-Bucket mithilfe derProcessingOutputConfig Eigenschaft eines Verarbeitungsschritten angeben.

step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri

Um die Datenabhängigkeit zu erstellen, übergeben Sie den Bucket wie folgt an einen Trainingsschritt.

from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )

Um zu überprüfen, welche Eigenschaften bei der Erstellung von Datenabhängigkeiten für jeden Schritttyp referenzierbar sind, finden Sie im Amazon SageMaker Python SDK unter Datenabhängigkeit — Eigenschaftsreferenz.

Benutzerdefinierte Abhängigkeit zwischen den Schritten

Wenn Sie eine Datenabhängigkeit angeben, stellt SageMaker Pipelines die Datenverbindung zwischen den Schritten bereit. Alternativ kann ein Schritt auf die Daten aus einem vorherigen Schritt zugreifen, ohne SageMaker Pipelines direkt zu verwenden. In diesem Fall können Sie eine benutzerdefinierte Abhängigkeit erstellen, die SageMaker Pipelines anweist, einen Schritt erst zu starten, nachdem ein weiterer Schritt abgeschlossen ist. Sie erstellen eine benutzerdefinierte Abhängigkeit, indem Sie dasDependsOn Attribut eines Schritts angeben.

Als Beispiel wird im Folgenden ein Schritt definiert, der erst beginntC, nachdem sowohl der SchrittA als auch der SchrittB abgeschlossen sind.

{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }

SageMaker Pipelines löst eine Validierungsausnahme aus, wenn die Abhängigkeit eine zyklische Abhängigkeit erzeugen würde.

Im folgenden Beispiel wird ein Trainingsschritt erstellt, der beginnt, nachdem ein Verarbeitungsschritt abgeschlossen ist.

processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])

Im folgenden Beispiel wird ein Trainingsschritt erstellt, der erst beginnt, wenn zwei verschiedene Verarbeitungsschritte abgeschlossen sind.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])

Im Folgenden finden Sie eine alternative Möglichkeit, die benutzerdefinierte Abhängigkeit zu erstellen.

training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])

Im folgenden Beispiel wird ein Trainingsschritt erstellt, der Eingaben von einem Verarbeitungsschritt empfängt und darauf wartet, dass ein anderer Verarbeitungsschritt abgeschlossen ist.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])

Das folgende Beispiel zeigt, wie eine Zeichenkettenliste der benutzerdefinierten Abhängigkeiten eines Schritts abgerufen wird.

custom_dependencies = training_step.depends_on

Verwenden Sie ein benutzerdefiniertes Bild in einem Schritt

Sie können jedes der verfügbaren SageMaker Deep Learning Container-Images verwenden, wenn Sie einen Schritt in Ihrer Pipeline erstellen.

Sie können auch Ihren eigenen Container mit Pipeline-Schritten verwenden. Da Sie in Amazon SageMaker Studio kein Image erstellen können, müssen Sie Ihr Image mit einer anderen Methode erstellen, bevor Sie es mit Amazon SageMaker Model Building Pipelines verwenden können.

Um bei der Erstellung der Schritte für Ihre Pipeline Ihren eigenen Container zu verwenden, fügen Sie die Bild-URI in die Estimator-Definition ein. Weitere Informationen zur Verwendung Ihres eigenen Containers mit SageMaker finden Sie unter Verwenden von Docker-Containern mit SageMaker.