Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Pipeline-Schritte
SageMaker Pipelines bestehen aus Schritten. Diese Schritte definieren die Aktionen, die die Pipeline ausführt, und die Beziehungen zwischen den Schritten mithilfe von Eigenschaften.
Themen
Schritttypen
Im Folgenden werden die Anforderungen der einzelnen Schritttypen beschrieben und ein Beispiel für die Implementierung des Schritts bereitgestellt. Dabei handelt es sich nicht um funktionale Implementierungen, da sie nicht die benötigten Ressourcen und Eingaben bereitstellen. Ein Tutorial, das diese Schritte implementiert, finden Sie unter SageMaker Pipelines erstellen und verwalten.
Anmerkung
Sie können auch einen Schritt aus Ihrem lokalen Machine-Learning-Code erstellen, indem Sie ihn mit dem @step
Decorator in einen SageMaker Pipelines-Schritt konvertieren. Weitere Informationen finden Sie unter @step Decorator.
Amazon SageMaker Model Building Pipelines unterstützen die folgenden Schritttypen:
@step Decorator
Sie können mit dem @step
Decorator einen Schritt aus lokalem Machine-Learning-Code erstellen. Nachdem Sie Ihren Code getestet haben, können Sie die Funktion in einen SageMaker Pipeline-Schritt konvertieren, indem Sie sie mit dem @step
Decorator kommentieren. SageMaker Pipelines erstellt und führt eine Pipeline aus, wenn Sie die Ausgabe der @step
-dekorierten Funktion als Schritt an Ihre Pipeline übergeben. Sie können auch eine mehrstufige DAG-Pipeline erstellen, die eine oder mehrere von dekorierte Funktionen sowie herkömmliche SageMaker Pipeline@step
-Schritte enthält. Weitere Informationen zum Erstellen eines Schritts mit @step
Decorator finden Sie unter L ift-and-shift Python-Code mit dem @step Decorator.
Verarbeitungsschritt
Verwenden Sie einen Verarbeitungsschritt, um einen Verarbeitungsauftrag für die Datenverarbeitung zu erstellen. Weitere Informationen zur Verarbeitung von Auftrags finden Sie unter Daten verarbeiten und Modelle auswerten.
Ein Verarbeitungsschritt erfordert einen Prozessor, ein Python-Skript, das den Verarbeitungscode definiert, Ausgaben für die Verarbeitung und Auftrag-Argumente. Das folgende Beispiel zeigt, wie man eine ProcessingStep
-Definition erstellt.
from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=
<role>
, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=
<input_data>
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )
Übergeben Sie Laufzeitparameter
Das folgende Beispiel zeigt, wie Laufzeitparameter von einem PySpark Prozessor an einen übergeben werdenProcessingStep
.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=
<role>
, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>
, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )
Weitere Informationen zu den Anforderungen an Verarbeitungsschritte finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.ProcessingStep
Trainingsschritt
Sie verwenden einen Trainingsschritt, um einen Schulungsauftrag zum Trainieren eines Modells zu erstellen. Weitere Informationen zu Schulungsaufträgen finden Sie unter Trainieren eines Modells mit Amazon SageMaker.
Ein Trainingsschritt erfordert einen Schätzer sowie Eingaben von Trainings- und Validierungsdaten. Das folgende Beispiel zeigt, wie Sie eine TrainingStep
-Definition erstellen. Weitere Informationen zu den Anforderungen an Trainingsschritte finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.TrainingStep
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )
Optimierungsschritt
Sie verwenden einen Optimierungsschritt, um einen Hyperparameter-Tuning-Auftrag zu erstellen, der auch als Hyperparameter-Optimierung (HPO) bezeichnet wird. Ein Hyperparameter-Optimierungsauftrag führt mehrere Trainingsauftrags aus, von denen jeder eine Modellversion erzeugt. Weitere Informationen zur Abstimmung der Hyperparameter finden Sie unter Durchführen der automatischen Modelloptimierung mit SageMaker.
Der Optimierungsauftrag ist mit dem SageMaker Experiment für die Pipeline verknüpft, wobei die Trainingsaufträge als Tests erstellt wurden. Weitere Informationen finden Sie unter Integration von Experimenten.
Ein Optimierungsschritt erfordert einen HyperparameterTunerwarm_start_config
-Parameter des HyperparameterTuner
angeben. Weitere Informationen zur Hyperparameteroptimierung und zum Warmstart finden Sie unter Durchführen eines Hyperparameter-Optimierungsauftrags mit Warmstart.
Sie verwenden die Methode get_top_model_s3_uri
Wichtig
Optimierungsschritte wurden in Amazon SageMaker Python SDK v2.48.0 und Amazon SageMaker Studio Classic v3.8.0 eingeführt. Sie müssen Studio Classic aktualisieren, bevor Sie einen Optimierungsschritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio Classic finden Sie unter Studio SageMaker Classic herunterfahren und aktualisieren.
Das folgende Beispiel zeigt, wie man eine TuningStep
-Definition erstellt.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://
my-bucket/my-data
")) )
Holen Sie sich die beste Modellversion
Das folgende Beispiel zeigt, wie Sie mit der get_top_model_s3_uri
Methode die beste Modellversion aus dem Tuning-Auftrag abrufen können. Die 50 leistungsstärksten Versionen sind höchstens verfügbar, geordnet nach HyperParameterTuningJobObjective. Das Argument top_k
ist ein Index für die Versionen, wobei top_k=0
die leistungsstärkste und top_k=49
die leistungsschwächste Version ist.
best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )
Weitere Informationen zu den Anforderungen an Optimierungsschritte finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.TuningStep
AutoML-Schritt
Verwenden Sie die AutoML
Anmerkung
Derzeit unterstützt der AutoML-Schritt nur den Ensembling-Trainingsmodus.
Das folgende Beispiel zeigt, wie eine Definition mit AutoMLStep
erstellt werden kann.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="
<role>
", target_attribute_name="my_target_attribute_name
", mode="ENSEMBLING
", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://my-bucket/my-training-data
", target_attribute_name="my_target_attribute_name
", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://my-bucket/my-validation-data
", target_attribute_name="my_target_attribute_name
", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )
Holen Sie sich die beste Modellversion
Der AutoML-Schritt trainiert automatisch mehrere Modellkandidaten. Sie können das Modell mit der besten Zielmetrik aus dem AutoML-Auftrag abrufen, indem Sie die get_best_auto_ml_model
Methode und ein IAM role
verwenden, um wie folgt auf Modellartefakte zuzugreifen.
best_model = step_automl.get_best_auto_ml_model(
role=<role>
)
Weitere Informationen finden Sie im AutoML
Modell Schritt
Verwenden Sie eine ModelStep
, um ein SageMaker Modell zu erstellen oder zu registrieren. Weitere Informationen zu denModelStep
Anforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Model_Step.ModelStep
Erstellen eines Modells
Sie können ein verwendenModelStep
, um ein SageMaker Modell zu erstellen. Ein ModelStep
erfordert Modellartefakte und Informationen über den SageMaker Instance-Typ, den Sie zum Erstellen des Modells verwenden müssen. Weitere Informationen zu SageMaker Modellen finden Sie unter Trainieren eines Modells mit Amazon SageMaker.
Das folgende Beispiel zeigt, wie man eine ModelStep
-Definition erstellt.
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )
Registrieren eines Modells
Sie können eine sagemaker.model.Model
oder eine bei sagemaker.pipeline.PipelineModel
der Amazon- SageMaker Modellregistrierung ModelStep
registrieren. Ein PipelineModel
stellt eine Inferenzpipeline dar, ein Modell, das aus einer linearen Abfolge von Containern besteht, die Inferenzanforderungen verarbeiten. Weitere Informationen über die Registrierung eines Modells finden Sie unter Modelle mit Model Registry registrieren und bereitstellen.
Das folgende Beispiel zeigt, wie Sie eine ModelStep
erstellen, die eine PipelineModel
registriert.
import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(
bucket_name
, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )
CreateModel Schritt
Wichtig
Wir empfehlen die Verwendung von Modell Schritt, um Modelle ab v2.90.0 des SageMaker Python SDK zu erstellen. funktioniert CreateModelStep
weiterhin in früheren Versionen des SageMaker Python SDK, wird aber nicht mehr aktiv unterstützt.
Sie verwenden einen CreateModel
Schritt, um ein SageMaker Modell zu erstellen. Weitere Informationen zu SageMaker Modellen finden Sie unter Trainieren eines Modells mit Amazon SageMaker.
Ein Schritt zum Erstellen eines Modells erfordert Modellartefakte und Informationen über den SageMaker Instance-Typ, den Sie zum Erstellen des Modells verwenden müssen. Das folgende Beispiel zeigt, wie Sie eine CreateModel
-Schrittdefinition erstellen. Weitere Informationen zuCreateModel
den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.CreateModelStep
from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )
RegisterModel Schritt
Wichtig
Wir empfehlen, zu verwendenModell Schritt, um Modelle ab v2.90.0 des SageMaker Python SDK zu registrieren. funktioniert RegisterModel
weiterhin in früheren Versionen des SageMaker Python SDK, wird aber nicht mehr aktiv unterstützt.
Sie verwenden einen RegisterModel
Schritt, um einen Sagemaker.model.ModelPipelineModel
stellt eine Inferenzpipeline dar, ein Modell, das aus einer linearen Abfolge von Containern besteht, die Inferenzanforderungen verarbeiten.
Weitere Informationen über die Registrierung eines Modells finden Sie unter Modelle mit Model Registry registrieren und bereitstellen. Weitere Informationen zu RegisterModel
den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Step_Sammlungen.RegisterModel
Das folgende Beispiel zeigt, wie Sie einen Schritt RegisterModel
erstellen, der eine PipelineModel
registriert.
import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(
bucket_name
, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/
', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/
', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel
", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium
", "ml.m5.xlarge
"], transform_instances=["ml.m5.xlarge
"], model_package_group_name='sipgroup
', )
Wenn model
nicht angegeben, benötigt der Registermodellschritt einen Schätzer, wie im folgenden Beispiel gezeigt.
from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="
AbaloneRegisterModel
", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge
"], transform_instances=["ml.m5.xlarge
"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )
Transformationsschritt
Sie verwenden einen Transformationsschritt für die Batch-Transformation, um die Inferenz für einen gesamten Datensatz durchzuführen. Weitere Informationen zur Batch-Transformation finden Sie unter Ausführen von Stapeltransformationen mit Inferenz-Pipelines.
Ein Transformationsschritt erfordert einen Transformator und die Daten, für die die Batch-Transformation ausgeführt werden soll. Das folgende Beispiel zeigt, wie Sie eine Transform
-Schrittdefinition erstellen. Weitere Informationen zuTransform
den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.TransformStep
from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://
my-bucket/my-data
"), )
Bedingungsschritt
Sie verwenden einen Bedingungsschritt, um den Zustand der Schritteigenschaften zu bewerten, um zu beurteilen, welche Maßnahme als Nächstes in der Pipeline ergriffen werden sollte.
Ein Bedingungsschritt erfordert eine Liste von Bedingungen, eine Liste von Schritten, die ausgeführt werden sollen, wenn die Bedingung zu true
ausgewertet wird, und eine Liste von Schritten, die ausgeführt werden sollen, wenn die Bedingung zu false
ausgewertet wird. Das folgende Beispiel zeigt, wie Sie eine ConditionStep
-Definition erstellen.
Einschränkungen
-
SageMaker Pipelines unterstützt nicht die Verwendung verschachtelter Bedingungsschritte. Sie können einen Bedingungsschritt nicht als Eingabe für einen anderen Bedingungsschritt übergeben.
-
Ein Bedingungsschritt kann nicht identische Schritte in beiden Zweigen verwenden. Wenn Sie in beiden Zweigen dieselbe Schrittfunktionalität benötigen, duplizieren Sie den Schritt und geben Sie ihm einen anderen Namen.
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )
Weitere Informationen zu denConditionStep
Anforderungen finden Sie unter sagemaker.workflow.condition_step.ConditionStep
Rückrufschritt
Sie verwenden einen Callback
Schritt, um zusätzliche Prozesse und AWS Services in Ihren Workflow zu integrieren, die nicht direkt von Amazon SageMaker Model Building Pipelines bereitgestellt werden. Wenn ein Callback
Schritt ausgeführt wird, erfolgt das folgende Verfahren:
-
SageMaker Pipelines sendet eine Nachricht an eine vom Kunden angegebene Amazon Simple Queue Service (Amazon SQS)-Warteschlange. Die Nachricht enthält ein von SageMaker Pipelines generiertes Token und eine vom Kunden bereitgestellte Liste der Eingabeparameter. Nach dem Senden der Nachricht SageMaker wartetPipelines auf eine Antwort vom Kunden.
-
Der Kunde ruft die Nachricht aus der Amazon-SQS-Warteschlange ab und startet seinen benutzerdefinierten Prozess.
-
Wenn der Vorgang abgeschlossen ist, ruft der Kunde eine der folgenden APIs auf und sendet das von SageMaker Pipelines generierte Token:
-
SendPipelineExecutionStepSuccess, zusammen mit einer Liste von Ausgabeparametern
-
SendPipelineExecutionStepFailure, zusammen mit einem Grund für das Fehlschlagen
-
-
Der API-Aufruf bewirkt, dass SageMaker Pipelines entweder den Pipeline-Prozess fortsetzen oder den Prozess fehlschlagen.
Weitere Informationen zu Callback
den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Callback_Step.CallbackStep
Wichtig
Callback
-Schritte wurden in Amazon SageMaker Python SDK v2.45.0 und Amazon SageMaker Studio Classic v3.6.2 eingeführt. Sie müssen Studio Classic aktualisieren, bevor Sie einen Callback
Schritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio Classic finden Sie unter Studio SageMaker Classic herunterfahren und aktualisieren.
Das folgende Beispiel zeigt eine Implementierung des vorherigen Verfahrens.
from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
Anmerkung
Die Ausgabeparameter für CallbackStep
sollten nicht verschachtelt sein. Wenn Sie beispielsweise ein verschachteltes Wörterbuch als Ausgabeparameter verwenden, wird das Wörterbuch als eine einzelne Zeichenfolge behandelt (z. B. {"output1": "{\"nested_output1\":\"my-output\"}"}
). Wenn Sie einen verschachtelten Wert angeben und versuchen, auf einen bestimmten Ausgabeparameter zu verweisen, wird ein Client-Fehler ausgelöst, der nicht erneut versucht werden kann.
Verhalten wird gestoppt
Ein Pipelineprozess wird nicht gestoppt, während ein Callback
Schritt ausgeführt wird.
Wenn Sie StopPipelineExecution für einen Pipeline-Prozess mit einem laufenden Callback
Schritt aufrufen, sendet SageMaker Pipelines eine zusätzliche Amazon SQS-Nachricht an die angegebene SQS-Warteschlange. Der Hauptteil der SQS-Nachricht enthält ein Statusfeld, das auf Stopping
gesetzt ist. Im Folgenden wird ein Beispiel für einen SQS-Nachrichtenkörper gezeigt.
{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }
Sie sollten Ihrem Amazon SQS-Nachrichtenverbraucher Logik hinzufügen, um nach Erhalt der Nachricht alle erforderlichen Maßnahmen (z. B. Ressourcenbereinigung) zu ergreifen, gefolgt von einem Aufruf von SendPipelineExecutionStepSuccess
oder SendPipelineExecutionStepFailure
.
Nur wenn SageMaker Pipelines einen dieser Aufrufe erhält, wird der Pipeline-Prozess gestoppt.
Lambda-Schritt
Sie verwenden einen Lambda-Schritt, um eine AWS Lambda Funktion auszuführen. Sie können eine vorhandene Lambda-Funktion ausführen oder eine neue Lambda-Funktion SageMaker erstellen und ausführen. Ein Notebook, das zeigt, wie ein Lambda-Schritt in einer SageMaker Pipeline verwendet wird, finden Sie unter sagemaker-pipelines-lambda-step.ipynb
Wichtig
Lambda-Schritte wurden in Amazon SageMaker Python SDK v2.51.0 und Amazon SageMaker Studio Classic v3.9.1 eingeführt. Sie müssen Studio Classic aktualisieren, bevor Sie einen Lambda-Schritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio Classic finden Sie unter Studio SageMaker Classic herunterfahren und aktualisieren.
SageMaker stellt die Klasse sagemaker.lambda_helper.LambdaLambda
hat die folgende Signatur.
Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )
Die Klasse sagemaker.workflow.lambda_step.LambdaSteplambda_func
Argument vom Typ Lambda
. Um eine bestehende Lambda-Funktion aufzurufen, muss nur der Amazon Resource Name (ARN) der Funktion an function_arn
übergeben werden. Wenn Sie keinen Wert für function_arn
angeben, müssen Sie handler
und eine der folgenden Angaben machen:
-
zipped_code_dir
— Der Pfad der komprimierten Lambda-Funktions3_bucket
— Amazon S3-Bucket, wozipped_code_dir
hochgeladen werden soll -
script
— Der Pfad der Lambda-Funktionsskriptdatei
Das folgende Beispiel zeigt, wie eine Lambda
Schrittdefinition erstellt wird, die eine vorhandene Lambda-Funktion aufruft.
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
Das folgende Beispiel zeigt, wie Sie eine Lambda
Schrittdefinition erstellen, die mithilfe eines Lambda-Funktionsskripts eine Lambda-Funktion erstellt und aufruft.
from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )
Eingaben und Ausgaben
Wenn Ihre Lambda
Funktion Eingaben oder Ausgaben hat, müssen diese ebenfalls in Ihrem Schritt definiert werden. Lambda
Anmerkung
Eingabe- und Ausgabeparameter sollten nicht verschachtelt sein. Wenn Sie beispielsweise ein verschachteltes Wörterbuch als Ausgabeparameter verwenden, wird das Wörterbuch als eine einzelne Zeichenfolge behandelt (z. B. {"output1": "{\"nested_output1\":\"my-output\"}"}
). Wenn Sie einen verschachtelten Wert angeben und später versuchen, darauf zu verweisen, wird ein Client-Fehler ausgegeben, der nicht erneut versucht werden kann.
Bei der Definition des Lambda
Schritts inputs
muss es sich um ein Wörterbuch mit Schlüssel-Wert-Paaren handeln. Jeder Wert des inputs
Wörterbuchs muss ein primitiver Typ sein (Zeichenfolge, Ganzzahl oder Gleitkommazahl). Verschachtelte Objekte werden nicht unterstützt. Bleibt der Wert für inputs
undefiniert, wird der Wert für None
verwendet.
Der outputs
Wert muss eine Liste von Schlüsseln sein. Diese Schlüssel beziehen sich auf ein Wörterbuch, das in der Ausgabe der Lambda
Funktion definiert ist. Wie bei inputs
müssen diese Schlüssel primitive Typen sein, und verschachtelte Objekte werden nicht unterstützt.
Timeout und Verhalten beim Stoppen
Die Lambda
Klasse hat ein timeout
Argument, das die maximale Zeit angibt, während der die Lambda-Funktion ausgeführt werden kann. Der Standardwert ist 120 Sekunden und der Höchstwert 10 Minuten. Wenn die Lambda-Funktion ausgeführt wird, wenn das Timeout erreicht ist, schlägt der Lambda-Schritt fehl. Die Lambda-Funktion wird jedoch weiterhin ausgeführt.
Ein Pipelineprozess kann nicht gestoppt werden, während ein Lambda-Schritt ausgeführt wird, da die durch den Lambda-Schritt aufgerufene Lambda-Funktion nicht gestoppt werden kann. Wenn Sie versuchen, den Prozess zu beenden, während die Lambda-Funktion ausgeführt wird, wartet die Pipeline, bis die Lambda-Funktion beendet ist oder bis das Timeout erreicht ist, je nachdem, was zuerst eintritt, und stoppt dann. Wenn die Lambda-Funktion beendet wird, lautet der Status des Pipeline-Prozesses Stopped
. Wenn die Zeitüberschreitung erreicht ist, lautet der Status des Pipeline-Prozesses Failed
.
ClarifyCheck Schritt
Sie können diesen ClarifyCheck
Schritt verwenden, um die Ausgangsabweichung anhand früherer Basislinien zu überprüfen, um die Verzerrungsanalyse und die Erklärbarkeit des Modells zu verbessern. Mit der model.register()
Methode können Sie dann Ihre Baselines erstellen und registrieren und die Ausgabe dieser Methode mit Modell Schritt an step_args
übergeben. Diese Baselines für die Abweichungsprüfung können von Amazon SageMaker Model Monitor für Ihre Modellendpunkte verwendet werden, sodass Sie keinen separaten Baseline-Vorschlag machen müssen. Bei diesem ClarifyCheck
Schritt können auch Basiswerte für die Driftprüfung aus der Modellregistrierung abgerufen werden. Der ClarifyCheck
Schritt nutzt den vorgefertigten Amazon SageMaker Clarify-Container, der eine Reihe von Modellüberwachungsfunktionen bietet, einschließlich des Vorschlags für Einschränkungen und der Validierung von Einschränkungen anhand einer bestimmten Baseline. Weitere Informationen finden Sie unter Erste Schritte mit einem SageMaker Clarify-Container.
Konfigurieren des ClarifyCheck Schritts
Sie können den ClarifyCheck
Schritt so konfigurieren, dass bei jeder Verwendung in einer Pipeline nur einer der folgenden Prüftypen durchgeführt wird.
-
Prüfung auf Datenverzerrung
-
Überprüfung der Modellverzerrung
-
Überprüfung der Erklärbarkeit des Modells
Dazu setzen Sie den clarify_check_config
Parameter mit einem der folgenden Prüftypwerte:
-
DataBiasCheckConfig
-
ModelBiasCheckConfig
-
ModelExplainabilityCheckConfig
Der ClarifyCheck
Schritt startet einen Verarbeitungsauftrag, der den SageMaker vorgefertigten Clarify-Container ausführt und spezielle Konfigurationen für die Prüfung und den Verarbeitungsauftrag erfordert. ClarifyCheckConfig
und CheckJobConfig
sind Hilfsfunktionen für diese Konfigurationen, die darauf abgestimmt sind, wie der SageMaker Clarify-Verarbeitungsauftrag zur Überprüfung von Modellverzerrungen, Datenverzerrungen oder Modellerklärbarkeit berechnet. Weitere Informationen finden Sie unter Ausführung von SageMaker Clarify-Verarbeitungsaufträgen für Bias-Analyse und Erklärbarkeit.
Steuerung des Schrittverhaltens bei der Drift-Prüfung
Für diesen ClarifyCheck
Schritt sind die folgenden zwei booleschen Flags erforderlich, um sein Verhalten zu steuern:
-
skip_check
: Dieser Parameter gibt an, ob die Driftprüfung gegenüber der vorherigen Basislinie übersprungen wurde oder nicht. WennFalse
auf gesetzt ist, muss die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein. -
register_new_baseline
: Dieser Parameter gibt an, ob über die SchritteigenschaftBaselineUsedForDriftCheckConstraints
auf eine neu berechnete Basislinie zugegriffen werden kann. WennFalse
auf gesetzt ist, muss auch die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein. Darauf kann über dieBaselineUsedForDriftCheckConstraints
Eigenschaft zugegriffen werden.
Weitere Informationen finden Sie unter Basisberechnung, Abweichungserkennung und Lebenszyklus mit den QualityCheck Schritten ClarifyCheck und in Amazon SageMaker Model Building Pipelines.
Arbeiten mit Baselines
Sie können optional model_package_group_name
angeben, um die vorhandene Baseline zu finden, und der Schritt ClarifyCheck
zieht das DriftCheckBaselines
auf das letzte genehmigte Modellpaket in der Modellpaketgruppe. Oder Sie können über den supplied_baseline_constraints
Parameter eine vorherige Basislinie angeben. Wenn Sie sowohl model_package_group_name
als auch supplied_baseline_constraints
angeben, verwendet der Schritt ClarifyCheck
die durch den supplied_baseline_constraints
Parameter angegebene Basislinie.
Weitere Informationen zur Verwendung der ClarifyCheck
Schrittanforderungen finden Sie unter sagemaker.workflow.steps.ClarifyCheckStepClarifyCheck
in SageMaker Pipelines verwendet wird, finden Sie unter sagemaker-pipeline-model-monitor-clarify-steps.ipynb
Beispiel Erstellen eines ClarifyCheck
-Schrittes zur Prüfung der Datenverzerrung
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="
ml.c5.xlarge
", volume_size_in_gb=120
, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep
']), label=0, dataset_type="text/csv
", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0
], facet_name=[8
], facet_values_or_threshold=[[0.5
]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep
", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json
", model_package_group_name="MyModelPackageGroup
" )
QualityCheck Schritt
Mit dem Schritt QualityCheck
können Sie Baseline-Vorschläge und Drift-Checks gegen eine frühere Baseline für die Datenqualität oder die Modellqualität in einer Pipeline durchführen. Sie können dann Ihre Baselines mit der model.register()
-Methode erzeugen und registrieren und die Ausgabe dieser Methode mit Modell Schritt an step_args
übergeben. Model Monitor kann diese Basislinien für die Drift-Prüfung Ihrer Modellendpunkte verwenden, sodass Sie einen Basisvorschlag nicht separat erstellen müssen. Bei diesem QualityCheck
Schritt können auch Basiswerte für die Driftprüfung aus der Modellregistrierung abgerufen werden. Der QualityCheck
Schritt nutzt den vorgefertigten Container von Amazon SageMaker Model Monitor, der über eine Reihe von Modellüberwachungsfunktionen verfügt, einschließlich Einschränkungsvorschlag, Statistikgenerierung und Einschränkungsvalidierung anhand einer Baseline. Weitere Informationen finden Sie unter Vorgefertigter Container von Amazon SageMaker Model Monitor.
Konfigurieren des QualityCheck Schritts
Sie können den QualityCheck
Schritt so konfigurieren, dass bei jeder Verwendung in einer Pipeline nur einer der folgenden Prüftypen durchgeführt wird.
-
Überprüfung der Datenqualität
-
Modellqualitätsprüfung
Dazu setzen Sie den quality_check_config
Parameter mit einem der folgenden Prüftypwerte:
-
DataQualityCheckConfig
-
ModelQualityCheckConfig
In diesem QualityCheck
Schritt wird ein Verarbeitungsauftrag gestartet, der den vorgefertigten Container von Model Monitor ausführt und spezielle Konfigurationen für die Prüfung und den Verarbeitungsauftrag erfordert. Bei den QualityCheckConfig
und CheckJobConfig
handelt es sich um Hilfsfunktionen für diese Konfigurationen, die darauf abgestimmt sind, wie Model Monitor eine Grundlage für die Überwachung der Modell- oder Datenqualität erstellt. Weitere Informationen zu den Basisvorschlägen von Model Monitor finden Sie unter Erstellen einer Baseline undErstellen Sie eine Basislinie für die Modellqualität.
Steuern des Schrittverhaltens bei der Drift-Prüfung
Für diesen QualityCheck
Schritt sind die folgenden zwei booleschen Flags erforderlich, um sein Verhalten zu steuern:
-
skip_check
: Dieser Parameter gibt an, ob die Driftprüfung gegenüber der vorherigen Basislinie übersprungen wurde oder nicht. WennFalse
auf gesetzt ist, muss die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein. -
register_new_baseline
: Dieser Parameter gibt an, ob auf eine neu berechnete Basislinie über die SchritteigenschaftenBaselineUsedForDriftCheckConstraints
undBaselineUsedForDriftCheckStatistics
. Ist sie aufFalse
eingestellt, muss auch die vorherige Baseline der konfigurierten Prüfart verfügbar sein. Auf diese kann über die EigenschaftenBaselineUsedForDriftCheckConstraints
undBaselineUsedForDriftCheckStatistics
zugegriffen werden.
Weitere Informationen finden Sie unter Basisberechnung, Abweichungserkennung und Lebenszyklus mit den QualityCheck Schritten ClarifyCheck und in Amazon SageMaker Model Building Pipelines.
Arbeiten mit Baselines
Sie können eine vorherige Basislinie direkt über die Parameter supplied_baseline_statistics
und supplied_baseline_constraints
angeben, oder Sie können einfach das model_package_group_name
angeben und der QualityCheck
Schritt ruft das DriftCheckBaselines
auf dem letzten genehmigten Modellpaket in der Modellpaketgruppe ab. Wenn Sie, das model_package_group_name
, das supplied_baseline_constraints
und supplied_baseline_statistics
angeben, verwendet der QualityCheck
Schritt die Baseline, die durch supplied_baseline_constraints
und supplied_baseline_statistics
für den Prüftyp des QualityCheck
Schritts, den Sie gerade ausführen, festgelegt wurde.
Weitere Informationen zur Verwendung der QualityCheck
Schrittanforderungen finden Sie unter sagemaker.workflow.steps.QualityCheckStepQualityCheck
in SageMaker Pipelines verwendet wird, finden Sie unter sagemaker-pipeline-model-monitor-clarify-steps.ipynb
Beispiel Erstellen eines QualityCheck
-Schrittes zur Prüfung der Datenqualität
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="
ml.c5.xlarge
", volume_size_in_gb=120
, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train
"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json
", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json
", model_package_group_name="MyModelPackageGroup
" )
EMR-Schritt
Sie können den EMR-Schritt Amazon SageMaker Model Building Pipelines verwenden, um Amazon-EMR-Schritte auf einem laufenden Amazon-EMR-Cluster zu verarbeiten, oder die Pipeline einen Amazon-EMR-Cluster für Sie erstellen und verwalten lassen. Weitere Informationen über Amazon EMR finden Sie unter Erste Schritte mit Amazon EMR.
Der EMR-Schritt erfordert, dass EMRStepConfig
den Speicherort der JAR-Datei, die vom Amazon EMR-Cluster verwendet werden soll, und alle zu übergebenden Argumente enthält. Sie geben auch die Amazon EMR-Cluster-ID an, wenn Sie den Schritt auf einem laufenden EMR-Cluster ausführen möchten, oder die Cluster-Konfiguration, wenn der EMR-Schritt auf einem Cluster ausgeführt werden soll, den er für Sie erstellt, verwaltet und beendet. Die folgenden Abschnitte enthalten Beispiele und Links zu Beispiel-Notebooks, die beide Methoden demonstrieren.
Anmerkung
-
EMR-Schritte erfordern, dass die an Ihre Pipeline übergebene Rolle über zusätzliche Berechtigungen verfügt. Sie sollten die AWSverwaltete Richtlinie:
AmazonSageMakerPipelinesIntegrations
an Ihre Pipeline-Rolle anhängen oder sicherstellen, dass die Rolle die in dieser Richtlinie enthaltenen Berechtigungen umfasst. -
EMR Step wird weder auf EMR Serverless noch auf Amazon EMR on EKS unterstützt.
-
Wenn Sie einen EMR-Schritt auf einem laufenden Cluster verarbeiten, können Sie nur einen Cluster verwenden, der sich in einem der folgenden Zustände befindet:
STARTING
,BOOTSTRAPPING
,RUNNING
, oderWAITING
. -
Wenn Sie EMR-Schritte in einem laufenden Cluster verarbeiten, können Sie maximal 256 EMR-Schritte in einem
PENDING
Status auf einem EMR-Cluster haben. EMR-Schritte, die über diesen Grenzwert hinaus eingereicht werden, führen zu einem Fehler bei der Pipeline-Ausführung. Sie können auch Richtlinie für Pipeline-Schritte erneut versuchen verwenden. Sie können entweder Cluster-ID oder Cluster-Konfiguration angeben, aber nicht beides.
Der EMR-Schritt basiert darauf EventBridge , dass Amazon Änderungen im EMR-Schritt oder Cluster-Status überwacht. Wenn Sie Ihren Amazon EMR-Auftrag auf einem laufenden Cluster verarbeiten, verwendet der EMR-Schritt die
SageMakerPipelineExecutionEMRStepStatusUpdateRule
Regel zur Überwachung des EMR-Schrittstatus. Wenn Sie Ihren Auftrag auf einem Cluster verarbeiten, den der EMR-Schritt für Sie erstellt, verwendet der Schritt dieSageMakerPipelineExecutionEMRClusterStatusRule
Regel, um Änderungen im Clusterstatus zu überwachen. Wenn Sie eine dieser EventBridge Regeln in Ihrem AWS Konto sehen, löschen Sie sie nicht, andernfalls wird Ihr EMR-Schritt möglicherweise nicht abgeschlossen.
Starten Sie einen neuen Auftrag auf einem laufenden Amazon EMR-Cluster
Wenn Sie einen neuen Auftrag auf einem laufenden Amazon EMR-Cluster starten möchten, übergeben Sie die Cluster-ID als Zeichenfolge an das cluster_id
Argument von EMRStep
. Das folgende Beispiel veranschaulicht diese Vorgehensweise.
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="
jar-location
", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )
Ein Beispiel-Notebook, das Sie durch ein vollständiges Beispiel führt, finden Sie unter SageMaker Pipelines EMR Step with Running EMR Cluster.
Starten Sie einen neuen Auftrag in einem neuen Amazon EMR-Cluster
Wenn Sie einen neuen Auftrag auf einem neuen Cluster starten möchten, den für Sie EMRStep
erstellt, geben Sie Ihre Clusterkonfiguration als Wörterbuch mit derselben Struktur wie eine -RunJobFlowAnforderung an. Nehmen Sie jedoch nicht die folgenden Felder in Ihre Clusterkonfiguration auf:
[
Name
][
Steps
][
AutoTerminationPolicy
][
Instances
][KeepJobFlowAliveWhenNoSteps
][
Instances
][TerminationProtected
]
Alle anderen RunJobFlow
Argumente können in Ihrer Clusterkonfiguration verwendet werden. Weitere Informationen zur Anforderungssyntax finden Sie unter RunJobFlow.
Im folgenden Beispiel wird eine Clusterkonfiguration an eine EMR-Schrittdefinition übergeben, die den Schritt auffordert, einen neuen Auftrag auf einem neuen EMR-Cluster zu starten. Die EMR-Clusterkonfiguration in diesem Beispiel umfasst Spezifikationen für primäre und zentrale EMR-Clusterknoten. Weitere Informationen zu Amazon EMR-Knotentypen finden Sie unter Grundlegendes zu Knotentypen: Primär-, Kern- und Aufgabenknoten.
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="
jar-location
", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role
", "ServiceRole": "service-role
" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )
Ein Beispiel-Notebook, das Sie durch ein vollständiges Beispiel führt, finden Sie unter SageMaker Pipelines EMR Step with Cluster Lifecycle Management.
Schritt „Notebook-Auftrag“
Verwenden Sie eine NotebookJobStep
, um Ihren SageMaker Notebook-Auftrag nicht interaktiv als Pipeline-Schritt auszuführen. Weitere Informationen zu SageMaker Notebook-Aufträgen finden Sie unter SageMaker Notebook-Aufträge.
Ein NotebookJobStep
erfordert mindestens ein Eingabe-Notebook, einen Image-URI und einen Kernel-Namen. Weitere Informationen zu den Schrittanforderungen für Notebook-Aufträge und anderen Parametern, die Sie festlegen können, um Ihren Schritt anzupassen, finden Sie unter sagemaker.workflow.steps.NotebookJobStep
Im folgenden Beispiel werden Mindestargumente verwendet, um einen zu definierenNotebookJobStep
.
from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=
input_notebook
, image_uri=image_uri
, kernel_name=kernel_name
)
Ihr NotebookJobStep
Pipeline-Schritt wird als SageMaker Notebook-Auftrag behandelt, sodass Sie den Ausführungsstatus im Studio Classic UI-Notebook-Auftrags-Dashboard verfolgen können, wenn Sie bestimmte Tags mit dem tags
-Argument einschließen. Weitere Informationen zum Einschließen von Tags finden Sie unter Anzeigen Ihrer Notebook-Aufträge im Studio-UI-Dashboard.
Wenn Sie Ihren Notebook-Auftrag mit dem SageMaker Python SDK planen, können Sie außerdem nur bestimmte Bilder angeben, um Ihren Notebook-Auftrag auszuführen. Weitere Informationen finden Sie unter Image-Einschränkungen für SageMaker Python-SDK-Notebook-Aufträge.
Schritt fehlschlagen
Sie verwenden eine FailStep
, um eine Ausführung von Amazon SageMaker Model Building Pipelines zu stoppen, wenn eine gewünschte Bedingung oder ein gewünschter Status nicht erreicht wird, und um die Ausführung dieser Pipeline als fehlgeschlagen zu markieren. In der FailStep
können Sie auch eine benutzerdefinierte Fehlermeldung eingeben, die die Ursache für den Ausführungsfehler der Pipeline angibt.
Anmerkung
Wenn ein FailStep
und andere Pipeline-Schritte gleichzeitig ausgeführt werden, wird die Pipeline erst beendet, wenn alle gleichzeitigen Schritte abgeschlossen sind.
Einschränkungen bei der Verwendung FailStep
-
Sie können ein
FailStep
nicht in dieDependsOn
-Liste anderer Schritte aufnehmen. Weitere Informationen finden Sie unter Benutzerdefinierte Abhängigkeit zwischen Schritten. -
Andere Schritte können sich nicht auf das
FailStep
beziehen. Dies ist immer der letzte Schritt bei der Ausführung einer Pipeline. -
Sie können eine Pipeline-Ausführung, die mit einem
FailStep
endet, nicht wiederholen.
Sie können die FailStep
ErrorMessage
in Form einer statischen Textzeichenfolge erstellen. Alternativ können Sie auch Pipeline-Parameter, eine Join
Der folgende Beispielcodeausschnitt verwendet eine FailStep
mit Pipelineparametern ErrorMessage
konfigurierte Option und eine Join
Operation.
from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="
AbaloneMSEFail
", error_message=Join( on=" ", values=["Execution failed due to MSE >
", mse_threshold_param] ), )
Eigenschaften des Schritts
Das properties
Attribut wird verwendet, um Datenabhängigkeiten zwischen Schritten in der Pipeline hinzuzufügen. Diese Datenabhängigkeiten werden dann von SageMaker Pipelines verwendet, um die DAG aus der Pipeline-Definition zu erstellen. Diese Eigenschaften können als Platzhalterwerte referenziert werden und werden zur Laufzeit aufgelöst.
Das -properties
Attribut eines SageMaker Pipelines-Schritts entspricht dem -Objekt, das von einem -Describe
Aufruf für den entsprechenden SageMaker Auftragstyp zurückgegeben wird. Für jeden Auftragstyp gibt der Describe
Aufruf das folgende Antwortobjekt zurück:
-
ProcessingStep
– DescribeProcessingJob -
TrainingStep
– DescribeTrainingJob -
TransformStep
– DescribeTransformJob
Informationen dazu, welche Eigenschaften während der Erstellung der Datenabhängigkeit für jeden Schritttyp referenzierbar sind, finden Sie unter Datenabhängigkeit – Eigenschaftsreferenz
Schrittparallelität
Wenn ein Schritt nicht von einem anderen Schritt abhängt, wird er sofort nach der Ausführung der Pipeline ausgeführt. Die parallel Ausführung zu vieler Pipeline-Schritte kann jedoch schnell die verfügbaren Ressourcen erschöpfen. Steuern Sie die Anzahl der gleichzeitigen Schritte für eine Pipeline-Ausführung mit ParallelismConfiguration
.
Im folgenden Beispiel wird ParallelismConfiguration
verwendet, um die Grenze für gleichzeitige Schritte auf fünf zu setzen.
pipeline.create( parallelism_config=ParallelismConfiguration(5), )
Datenabhängigkeit zwischen Schritten
Sie definieren die Struktur Ihrer DAG, indem Sie die Datenbeziehungen zwischen den Schritten angeben. Um Datenabhängigkeiten zwischen Schritten herzustellen, übergeben Sie die Eigenschaften eines Schritts als Eingabe an einen anderen Schritt in der Pipeline. Der Schritt, der die Eingabe empfängt, wird erst gestartet, nachdem der Schritt, der die Eingabe bereitstellt, abgeschlossen ist.
Eine Datenabhängigkeit verwendet die JsonPath Notation im folgenden Format. Dieses Format durchläuft die JSON-Eigenschaftendatei, was bedeutet, dass Sie so viele<property>
Instanzen wie nötig anhängen können, um die gewünschte verschachtelte Eigenschaft in der Datei zu erreichen. Weitere Informationen zur JsonPath Notation finden Sie im JsonPath Repo
<step_name>
.properties.<property>
.<property>
Im Folgenden wird gezeigt, wie ein Amazon-S3-Bucket mithilfe der ProcessingOutputConfig
Eigenschaft eines Verarbeitungsschritts angegeben wird.
step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
Um die Datenabhängigkeit zu erstellen, übergeben Sie den Bucket wie folgt an einen Trainingsschritt.
from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )
Informationen dazu, welche Eigenschaften während der Erstellung der Datenabhängigkeit für jeden Schritttyp referenzierbar sind, finden Sie unter Datenabhängigkeit – Eigenschaftsreferenz
Benutzerdefinierte Abhängigkeit zwischen Schritten
Wenn Sie eine Datenabhängigkeit angeben, stellt SageMaker Pipelines die Datenverbindung zwischen den Schritten bereit. Alternativ kann ein Schritt auf die Daten aus einem vorherigen Schritt zugreifen, ohne SageMaker Pipelines direkt zu verwenden. In diesem Fall können Sie eine benutzerdefinierte Abhängigkeit erstellen, die SageMaker Pipelines anweist, einen Schritt erst zu starten, nachdem ein anderer Schritt abgeschlossen ist. Sie erstellen eine benutzerdefinierte Abhängigkeit, indem Sie DependsOn
-Attribut eines Schritts angeben.
Im Folgenden wird beispielsweise ein Schritt C
definiert, der erst beginnt, wenn sowohl der Schritt A
als auch der Schritt B
abgeschlossen sind.
{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }
SageMaker Pipelines löst eine Validierungsausnahme aus, wenn die Abhängigkeit eine zyklische Abhängigkeit erzeugen würde.
Im folgenden Beispiel wird ein Trainingsschritt erstellt, der beginnt, nachdem ein Verarbeitungsschritt abgeschlossen ist.
processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])
Im folgenden Beispiel wird ein Trainingsschritt erstellt, der erst beginnt, wenn zwei verschiedene Verarbeitungsschritte abgeschlossen sind.
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])
Im Folgenden wird eine alternative Methode zum Erstellen der benutzerdefinierten Abhängigkeit beschrieben.
training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])
Im folgenden Beispiel wird ein Trainingsschritt erstellt, der Eingaben von einem Verarbeitungsschritt empfängt und darauf wartet, dass ein anderer Verarbeitungsschritt abgeschlossen ist.
processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])
Das folgende Beispiel zeigt, wie eine Stringliste der benutzerdefinierten Abhängigkeiten eines Schritts abgerufen wird.
custom_dependencies = training_step.depends_on
Verwenden Sie ein benutzerdefiniertes Bild in einem Schritt
Sie können jedes der verfügbaren SageMakerDeep Learning Container-Images
Sie können auch Ihren eigenen -Container mit Pipeline-Schritten verwenden. Da Sie kein Image aus Amazon SageMaker Studio Classic erstellen können, müssen Sie Ihr Image mit einer anderen Methode erstellen, bevor Sie es mit Amazon SageMaker Model Building Pipelines verwenden.
Um bei der Erstellung der Schritte für Ihre Pipeline Ihren eigenen Container zu verwenden, fügen Sie den Bild-URI in die Estimator-Definition ein. Weitere Informationen zur Verwendung Ihres eigenen Containers mit SageMakerfinden Sie unter Verwenden von Docker-Containern mit SageMaker.