Pipeline-Schritte - Amazon SageMaker

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Pipeline-Schritte

SageMaker Pipelines bestehen aus Schritten. Diese Schritte definieren die Aktionen, die die Pipeline ausführt, und die Beziehungen zwischen den Schritten mithilfe von Eigenschaften.

Schritttypen

Im Folgenden werden die Anforderungen der einzelnen Schritttypen beschrieben und ein Beispiel für die Implementierung des Schritts bereitgestellt. Dabei handelt es sich nicht um funktionale Implementierungen, da sie nicht die benötigten Ressourcen und Eingaben bereitstellen. Ein Tutorial, das diese Schritte implementiert, finden Sie unter SageMaker Pipelines erstellen und verwalten.

Anmerkung

Sie können auch einen Schritt aus Ihrem lokalen Machine-Learning-Code erstellen, indem Sie ihn mit dem @step Decorator in einen SageMaker Pipelines-Schritt konvertieren. Weitere Informationen finden Sie unter @step Decorator.

Amazon SageMaker Model Building Pipelines unterstützen die folgenden Schritttypen:

@step Decorator

Sie können mit dem @step Decorator einen Schritt aus lokalem Machine-Learning-Code erstellen. Nachdem Sie Ihren Code getestet haben, können Sie die Funktion in einen SageMaker Pipeline-Schritt konvertieren, indem Sie sie mit dem @step Decorator kommentieren. SageMaker Pipelines erstellt und führt eine Pipeline aus, wenn Sie die Ausgabe der @step-dekorierten Funktion als Schritt an Ihre Pipeline übergeben. Sie können auch eine mehrstufige DAG-Pipeline erstellen, die eine oder mehrere von dekorierte Funktionen sowie herkömmliche SageMaker Pipeline@step-Schritte enthält. Weitere Informationen zum Erstellen eines Schritts mit @step Decorator finden Sie unter L ift-and-shift Python-Code mit dem @step Decorator.

Verarbeitungsschritt

Verwenden Sie einen Verarbeitungsschritt, um einen Verarbeitungsauftrag für die Datenverarbeitung zu erstellen. Weitere Informationen zur Verarbeitung von Auftrags finden Sie unter Daten verarbeiten und Modelle auswerten.

Ein Verarbeitungsschritt erfordert einen Prozessor, ein Python-Skript, das den Verarbeitungscode definiert, Ausgaben für die Verarbeitung und Auftrag-Argumente. Das folgende Beispiel zeigt, wie man eine ProcessingStep-Definition erstellt.

from sagemaker.sklearn.processing import SKLearnProcessor sklearn_processor = SKLearnProcessor(framework_version='1.0-1', role=<role>, instance_type='ml.m5.xlarge', instance_count=1)
from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep inputs = [ ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] step_process = ProcessingStep( name="AbaloneProcess", step_args = sklearn_processor.run(inputs=inputs, outputs=outputs, code="abalone/preprocessing.py") )

Übergeben Sie Laufzeitparameter

Das folgende Beispiel zeigt, wie Laufzeitparameter von einem PySpark Prozessor an einen übergeben werdenProcessingStep.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.spark.processing import PySparkProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep pipeline_session = PipelineSession() pyspark_processor = PySparkProcessor( framework_version='2.4', role=<role>, instance_type='ml.m5.xlarge', instance_count=1, sagemaker_session=pipeline_session, ) step_args = pyspark_processor.run( inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),], outputs=[ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ], code="preprocess.py", arguments=None, ) step_process = ProcessingStep( name="AbaloneProcess", step_args=step_args, )

Weitere Informationen zu den Anforderungen an Verarbeitungsschritte finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.ProcessingStep Ein ausführliches Beispiel finden Sie unter Definieren eines Verarbeitungsschritts für Feature Engineering im Beispiel-Notebook Orchestrieren von Aufträgen zum Trainieren und Auswerten von Modellen mit Amazon SageMaker Pipelines.

Trainingsschritt

Sie verwenden einen Trainingsschritt, um einen Schulungsauftrag zum Trainieren eines Modells zu erstellen. Weitere Informationen zu Schulungsaufträgen finden Sie unter Trainieren eines Modells mit Amazon SageMaker.

Ein Trainingsschritt erfordert einen Schätzer sowie Eingaben von Trainings- und Validierungsdaten. Das folgende Beispiel zeigt, wie Sie eine TrainingStep-Definition erstellen. Weitere Informationen zu den Anforderungen an Trainingsschritte finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.TrainingStep

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TrainingStep from sagemaker.xgboost.estimator import XGBoost pipeline_session = PipelineSession() xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session) step_args = xgb_estimator.fit( inputs={ "train": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train" ].S3Output.S3Uri, content_type="text/csv" ), "validation": TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "validation" ].S3Output.S3Uri, content_type="text/csv" ) } ) step_train = TrainingStep( name="TrainAbaloneModel", step_args=step_args, )

Optimierungsschritt

Sie verwenden einen Optimierungsschritt, um einen Hyperparameter-Tuning-Auftrag zu erstellen, der auch als Hyperparameter-Optimierung (HPO) bezeichnet wird. Ein Hyperparameter-Optimierungsauftrag führt mehrere Trainingsauftrags aus, von denen jeder eine Modellversion erzeugt. Weitere Informationen zur Abstimmung der Hyperparameter finden Sie unter Durchführen der automatischen Modelloptimierung mit SageMaker.

Der Optimierungsauftrag ist mit dem SageMaker Experiment für die Pipeline verknüpft, wobei die Trainingsaufträge als Tests erstellt wurden. Weitere Informationen finden Sie unter Integration von Experimenten.

Ein Optimierungsschritt erfordert einen HyperparameterTuner und Trainingseingaben. Sie können frühere Abstimmungsaufträge erneut trainieren, indem Sie den warm_start_config-Parameter des HyperparameterTuner angeben. Weitere Informationen zur Hyperparameteroptimierung und zum Warmstart finden Sie unter Durchführen eines Hyperparameter-Optimierungsauftrags mit Warmstart.

Sie verwenden die Methode get_top_model_s3_uri der Klasse sagemaker.workflow.steps.TuningStep, um das Modellartefakt aus einer der leistungsstärksten Modellversionen abzurufen. Ein Notebook, das zeigt, wie Sie einen Optimierungsschritt in einer SageMaker Pipeline verwenden, finden Sie unter sagemaker-pipelines-tuning-step.ipynb.

Wichtig

Optimierungsschritte wurden in Amazon SageMaker Python SDK v2.48.0 und Amazon SageMaker Studio Classic v3.8.0 eingeführt. Sie müssen Studio Classic aktualisieren, bevor Sie einen Optimierungsschritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio Classic finden Sie unter Studio SageMaker Classic herunterfahren und aktualisieren.

Das folgende Beispiel zeigt, wie man eine TuningStep-Definition erstellt.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.tuner import HyperparameterTuner from sagemaker.inputs import TrainingInput from sagemaker.workflow.steps import TuningStep tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession()) step_tuning = TuningStep( name = "HPTuning", step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://my-bucket/my-data")) )

Holen Sie sich die beste Modellversion

Das folgende Beispiel zeigt, wie Sie mit der get_top_model_s3_uri Methode die beste Modellversion aus dem Tuning-Auftrag abrufen können. Die 50 leistungsstärksten Versionen sind höchstens verfügbar, geordnet nach HyperParameterTuningJobObjective. Das Argument top_k ist ein Index für die Versionen, wobei top_k=0 die leistungsstärkste und top_k=49 die leistungsschwächste Version ist.

best_model = Model( image_uri=image_uri, model_data=step_tuning.get_top_model_s3_uri( top_k=0, s3_bucket=sagemaker_session.default_bucket() ), ... )

Weitere Informationen zu den Anforderungen an Optimierungsschritte finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.TuningStep

AutoML-Schritt

Verwenden Sie die AutoML-API, um einen AutoML-Auftrag zum automatischen Trainieren eines Modells zu erstellen. Weitere Informationen zu AutoML-Aufträgen finden Sie unter Automatisieren der Modellentwicklung mit Amazon SageMaker Autopilot.

Anmerkung

Derzeit unterstützt der AutoML-Schritt nur den Ensembling-Trainingsmodus.

Das folgende Beispiel zeigt, wie eine Definition mit AutoMLStep erstellt werden kann.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.workflow.automl_step import AutoMLStep pipeline_session = PipelineSession() auto_ml = AutoML(..., role="<role>", target_attribute_name="my_target_attribute_name", mode="ENSEMBLING", sagemaker_session=pipeline_session) input_training = AutoMLInput( inputs="s3://my-bucket/my-training-data", target_attribute_name="my_target_attribute_name", channel_type="training", ) input_validation = AutoMLInput( inputs="s3://my-bucket/my-validation-data", target_attribute_name="my_target_attribute_name", channel_type="validation", ) step_args = auto_ml.fit( inputs=[input_training, input_validation] ) step_automl = AutoMLStep( name="AutoMLStep", step_args=step_args, )

Holen Sie sich die beste Modellversion

Der AutoML-Schritt trainiert automatisch mehrere Modellkandidaten. Sie können das Modell mit der besten Zielmetrik aus dem AutoML-Auftrag abrufen, indem Sie die get_best_auto_ml_model Methode und ein IAM role verwenden, um wie folgt auf Modellartefakte zuzugreifen.

best_model = step_automl.get_best_auto_ml_model(role=<role>)

Weitere Informationen finden Sie im AutoML-Schritt im SageMaker Python SDK.

Modell Schritt

Verwenden Sie eine ModelStep, um ein SageMaker Modell zu erstellen oder zu registrieren. Weitere Informationen zu denModelStep Anforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Model_Step.ModelStep

Erstellen eines Modells

Sie können ein verwendenModelStep, um ein SageMaker Modell zu erstellen. Ein ModelStep erfordert Modellartefakte und Informationen über den SageMaker Instance-Typ, den Sie zum Erstellen des Modells verwenden müssen. Weitere Informationen zu SageMaker Modellen finden Sie unter Trainieren eines Modells mit Amazon SageMaker.

Das folgende Beispiel zeigt, wie man eine ModelStep-Definition erstellt.

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.model import Model from sagemaker.workflow.model_step import ModelStep step_train = TrainingStep(...) model = Model( image_uri=pytorch_estimator.training_image_uri(), model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, sagemaker_session=PipelineSession(), role=role, ) step_model_create = ModelStep( name="MyModelCreationStep", step_args=model.create(instance_type="ml.m5.xlarge"), )

Registrieren eines Modells

Sie können eine sagemaker.model.Model oder eine bei sagemaker.pipeline.PipelineModel der Amazon- SageMaker Modellregistrierung ModelStep registrieren. Ein PipelineModel stellt eine Inferenzpipeline dar, ein Modell, das aus einer linearen Abfolge von Containern besteht, die Inferenzanforderungen verarbeiten. Weitere Informationen über die Registrierung eines Modells finden Sie unter Modelle mit Model Registry registrieren und bereitstellen.

Das folgende Beispiel zeigt, wie Sie eine ModelStep erstellen, die eine PipelineModel registriert.

import time from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel pipeline_session = PipelineSession() code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel( model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=pipeline_session, py_version='py3' ) xgboost_model = XGBoostModel( model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=pipeline_session, role=role ) from sagemaker.workflow.model_step import ModelStep from sagemaker import PipelineModel pipeline_model = PipelineModel( models=[sklearn_model, xgboost_model], role=role,sagemaker_session=pipeline_session, ) register_model_step_args = pipeline_model.register( content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', ) step_model_registration = ModelStep( name="AbaloneRegisterModel", step_args=register_model_step_args, )

CreateModel Schritt

Wichtig

Wir empfehlen die Verwendung von Modell Schritt, um Modelle ab v2.90.0 des SageMaker Python SDK zu erstellen. funktioniert CreateModelStep weiterhin in früheren Versionen des SageMaker Python SDK, wird aber nicht mehr aktiv unterstützt.

Sie verwenden einen CreateModel Schritt, um ein SageMaker Modell zu erstellen. Weitere Informationen zu SageMaker Modellen finden Sie unter Trainieren eines Modells mit Amazon SageMaker.

Ein Schritt zum Erstellen eines Modells erfordert Modellartefakte und Informationen über den SageMaker Instance-Typ, den Sie zum Erstellen des Modells verwenden müssen. Das folgende Beispiel zeigt, wie Sie eine CreateModel-Schrittdefinition erstellen. Weitere Informationen zuCreateModel den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.CreateModelStep

from sagemaker.workflow.steps import CreateModelStep step_create_model = CreateModelStep( name="AbaloneCreateModel", model=best_model, inputs=inputs )

RegisterModel Schritt

Wichtig

Wir empfehlen, zu verwendenModell Schritt, um Modelle ab v2.90.0 des SageMaker Python SDK zu registrieren. funktioniert RegisterModel weiterhin in früheren Versionen des SageMaker Python SDK, wird aber nicht mehr aktiv unterstützt.

Sie verwenden einen RegisterModel Schritt, um einen Sagemaker.model.Model oder eine Sagemaker.Pipeline beiPipelineModel der Amazon- SageMaker Modellregistrierung zu registrieren. Ein PipelineModel stellt eine Inferenzpipeline dar, ein Modell, das aus einer linearen Abfolge von Containern besteht, die Inferenzanforderungen verarbeiten.

Weitere Informationen über die Registrierung eines Modells finden Sie unter Modelle mit Model Registry registrieren und bereitstellen. Weitere Informationen zu RegisterModel den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Step_Sammlungen.RegisterModel

Das folgende Beispiel zeigt, wie Sie einen Schritt RegisterModel erstellen, der eine PipelineModel registriert.

import time from sagemaker.sklearn import SKLearnModel from sagemaker.xgboost import XGBoostModel code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix) sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri, entry_point='inference.py', source_dir='sklearn_source_dir/', code_location=code_location, framework_version='1.0-1', role=role, sagemaker_session=sagemaker_session, py_version='py3') xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts, entry_point='inference.py', source_dir='xgboost_source_dir/', code_location=code_location, framework_version='0.90-2', py_version='py3', sagemaker_session=sagemaker_session, role=role) from sagemaker.workflow.step_collections import RegisterModel from sagemaker import PipelineModel pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session) step_register = RegisterModel( name="AbaloneRegisterModel", model=pipeline_model, content_types=["application/json"], response_types=["application/json"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name='sipgroup', )

Wenn model nicht angegeben, benötigt der Registermodellschritt einen Schätzer, wie im folgenden Beispiel gezeigt.

from sagemaker.workflow.step_collections import RegisterModel step_register = RegisterModel( name="AbaloneRegisterModel", estimator=xgb_train, model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts, content_types=["text/csv"], response_types=["text/csv"], inference_instances=["ml.t2.medium", "ml.m5.xlarge"], transform_instances=["ml.m5.xlarge"], model_package_group_name=model_package_group_name, approval_status=model_approval_status, model_metrics=model_metrics )

Transformationsschritt

Sie verwenden einen Transformationsschritt für die Batch-Transformation, um die Inferenz für einen gesamten Datensatz durchzuführen. Weitere Informationen zur Batch-Transformation finden Sie unter Ausführen von Stapeltransformationen mit Inferenz-Pipelines.

Ein Transformationsschritt erfordert einen Transformator und die Daten, für die die Batch-Transformation ausgeführt werden soll. Das folgende Beispiel zeigt, wie Sie eine Transform-Schrittdefinition erstellen. Weitere Informationen zuTransform den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Steps.TransformStep

from sagemaker.workflow.pipeline_context import PipelineSession from sagemaker.transformer import Transformer from sagemaker.inputs import TransformInput from sagemaker.workflow.steps import TransformStep transformer = Transformer(..., sagemaker_session=PipelineSession()) step_transform = TransformStep( name="AbaloneTransform", step_args=transformer.transform(data="s3://my-bucket/my-data"), )

Bedingungsschritt

Sie verwenden einen Bedingungsschritt, um den Zustand der Schritteigenschaften zu bewerten, um zu beurteilen, welche Maßnahme als Nächstes in der Pipeline ergriffen werden sollte.

Ein Bedingungsschritt erfordert eine Liste von Bedingungen, eine Liste von Schritten, die ausgeführt werden sollen, wenn die Bedingung zu true ausgewertet wird, und eine Liste von Schritten, die ausgeführt werden sollen, wenn die Bedingung zu false ausgewertet wird. Das folgende Beispiel zeigt, wie Sie eine ConditionStep-Definition erstellen.

Einschränkungen

  • SageMaker Pipelines unterstützt nicht die Verwendung verschachtelter Bedingungsschritte. Sie können einen Bedingungsschritt nicht als Eingabe für einen anderen Bedingungsschritt übergeben.

  • Ein Bedingungsschritt kann nicht identische Schritte in beiden Zweigen verwenden. Wenn Sie in beiden Zweigen dieselbe Schrittfunktionalität benötigen, duplizieren Sie den Schritt und geben Sie ihm einen anderen Namen.

from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.functions import JsonGet cond_lte = ConditionLessThanOrEqualTo( left=JsonGet( step_name=step_eval.name, property_file=evaluation_report, json_path="regression_metrics.mse.value" ), right=6.0 ) step_cond = ConditionStep( name="AbaloneMSECond", conditions=[cond_lte], if_steps=[step_register, step_create_model, step_transform], else_steps=[] )

Weitere Informationen zu denConditionStep Anforderungen finden Sie unter sagemaker.workflow.condition_step.ConditionStep API-Referenz. Weitere Informationen zu unterstützten Bedingungen finden Sie unter Amazon SageMaker Model Building Pipelines – Bedingungen in der Python-SDK-Dokumentation. SageMaker

Rückrufschritt

Sie verwenden einen Callback Schritt, um zusätzliche Prozesse und AWS Services in Ihren Workflow zu integrieren, die nicht direkt von Amazon SageMaker Model Building Pipelines bereitgestellt werden. Wenn ein Callback Schritt ausgeführt wird, erfolgt das folgende Verfahren:

  • SageMaker Pipelines sendet eine Nachricht an eine vom Kunden angegebene Amazon Simple Queue Service (Amazon SQS)-Warteschlange. Die Nachricht enthält ein von SageMaker Pipelines generiertes Token und eine vom Kunden bereitgestellte Liste der Eingabeparameter. Nach dem Senden der Nachricht SageMaker wartetPipelines auf eine Antwort vom Kunden.

  • Der Kunde ruft die Nachricht aus der Amazon-SQS-Warteschlange ab und startet seinen benutzerdefinierten Prozess.

  • Wenn der Vorgang abgeschlossen ist, ruft der Kunde eine der folgenden APIs auf und sendet das von SageMaker Pipelines generierte Token:

  • Der API-Aufruf bewirkt, dass SageMaker Pipelines entweder den Pipeline-Prozess fortsetzen oder den Prozess fehlschlagen.

Weitere Informationen zu Callback den Schrittanforderungen finden Sie in der Dokumentation zu SageMaker.Workflow.Callback_Step.CallbackStep Eine vollständige Lösung finden Sie unter SageMaker Pipelines erweitern, um benutzerdefinierte Schritte mithilfe von Rückrufschritten einzubeziehen.

Wichtig

Callback -Schritte wurden in Amazon SageMaker Python SDK v2.45.0 und Amazon SageMaker Studio Classic v3.6.2 eingeführt. Sie müssen Studio Classic aktualisieren, bevor Sie einen Callback Schritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio Classic finden Sie unter Studio SageMaker Classic herunterfahren und aktualisieren.

Das folgende Beispiel zeigt eine Implementierung des vorherigen Verfahrens.

from sagemaker.workflow.callback_step import CallbackStep step_callback = CallbackStep( name="MyCallbackStep", sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue", inputs={...}, outputs=[...] ) callback_handler_code = ' import boto3 import json def handler(event, context): sagemaker_client=boto3.client("sagemaker") for record in event["Records"]: payload=json.loads(record["body"]) token=payload["token"] # Custom processing # Call SageMaker to complete the step sagemaker_client.send_pipeline_execution_step_success( CallbackToken=token, OutputParameters={...} ) '
Anmerkung

Die Ausgabeparameter für CallbackStep sollten nicht verschachtelt sein. Wenn Sie beispielsweise ein verschachteltes Wörterbuch als Ausgabeparameter verwenden, wird das Wörterbuch als eine einzelne Zeichenfolge behandelt (z. B. {"output1": "{\"nested_output1\":\"my-output\"}"}). Wenn Sie einen verschachtelten Wert angeben und versuchen, auf einen bestimmten Ausgabeparameter zu verweisen, wird ein Client-Fehler ausgelöst, der nicht erneut versucht werden kann.

Verhalten wird gestoppt

Ein Pipelineprozess wird nicht gestoppt, während ein Callback Schritt ausgeführt wird.

Wenn Sie StopPipelineExecution für einen Pipeline-Prozess mit einem laufenden Callback Schritt aufrufen, sendet SageMaker Pipelines eine zusätzliche Amazon SQS-Nachricht an die angegebene SQS-Warteschlange. Der Hauptteil der SQS-Nachricht enthält ein Statusfeld, das auf Stopping gesetzt ist. Im Folgenden wird ein Beispiel für einen SQS-Nachrichtenkörper gezeigt.

{ "token": "26vcYbeWsZ", "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a", "arguments": { "number": 5, "stringArg": "some-arg", "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv" }, "status": "Stopping" }

Sie sollten Ihrem Amazon SQS-Nachrichtenverbraucher Logik hinzufügen, um nach Erhalt der Nachricht alle erforderlichen Maßnahmen (z. B. Ressourcenbereinigung) zu ergreifen, gefolgt von einem Aufruf von SendPipelineExecutionStepSuccess oder SendPipelineExecutionStepFailure.

Nur wenn SageMaker Pipelines einen dieser Aufrufe erhält, wird der Pipeline-Prozess gestoppt.

Lambda-Schritt

Sie verwenden einen Lambda-Schritt, um eine AWS Lambda Funktion auszuführen. Sie können eine vorhandene Lambda-Funktion ausführen oder eine neue Lambda-Funktion SageMaker erstellen und ausführen. Ein Notebook, das zeigt, wie ein Lambda-Schritt in einer SageMaker Pipeline verwendet wird, finden Sie unter sagemaker-pipelines-lambda-step.ipynb.

Wichtig

Lambda-Schritte wurden in Amazon SageMaker Python SDK v2.51.0 und Amazon SageMaker Studio Classic v3.9.1 eingeführt. Sie müssen Studio Classic aktualisieren, bevor Sie einen Lambda-Schritt verwenden, sonst wird die Pipeline-DAG nicht angezeigt. Informationen zum Aktualisieren von Studio Classic finden Sie unter Studio SageMaker Classic herunterfahren und aktualisieren.

SageMaker stellt die Klasse sagemaker.lambda_helper.Lambda bereit, um Lambda-Funktionen zu erstellen, zu aktualisieren, aufzurufen und zu löschen. Lambda hat die folgende Signatur.

Lambda( function_arn, # Only required argument to invoke an existing Lambda function # The following arguments are required to create a Lambda function: function_name, execution_role_arn, zipped_code_dir, # Specify either zipped_code_dir and s3_bucket, OR script s3_bucket, # S3 bucket where zipped_code_dir is uploaded script, # Path of Lambda function script handler, # Lambda handler specified as "lambda_script.lambda_handler" timeout, # Maximum time the Lambda function can run before the lambda step fails ... )

Die Klasse sagemaker.workflow.lambda_step.LambdaStep hat ein lambda_func Argument vom Typ Lambda. Um eine bestehende Lambda-Funktion aufzurufen, muss nur der Amazon Resource Name (ARN) der Funktion an function_arn übergeben werden. Wenn Sie keinen Wert für function_arn angeben, müssen Sie handler und eine der folgenden Angaben machen:

  • zipped_code_dir— Der Pfad der komprimierten Lambda-Funktion

    s3_bucket— Amazon S3-Bucket, wo zipped_code_dir hochgeladen werden soll

  • script— Der Pfad der Lambda-Funktionsskriptdatei

Das folgende Beispiel zeigt, wie eine Lambda Schrittdefinition erstellt wird, die eine vorhandene Lambda-Funktion aufruft.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda" ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

Das folgende Beispiel zeigt, wie Sie eine Lambda Schrittdefinition erstellen, die mithilfe eines Lambda-Funktionsskripts eine Lambda-Funktion erstellt und aufruft.

from sagemaker.workflow.lambda_step import LambdaStep from sagemaker.lambda_helper import Lambda step_lambda = LambdaStep( name="ProcessingLambda", lambda_func=Lambda( function_name="split-dataset-lambda", execution_role_arn=execution_role_arn, script="lambda_script.py", handler="lambda_script.lambda_handler", ... ), inputs={ s3_bucket = s3_bucket, data_file = data_file }, outputs=[ "train_file", "test_file" ] )

Eingaben und Ausgaben

Wenn Ihre Lambda Funktion Eingaben oder Ausgaben hat, müssen diese ebenfalls in Ihrem Schritt definiert werden. Lambda

Anmerkung

Eingabe- und Ausgabeparameter sollten nicht verschachtelt sein. Wenn Sie beispielsweise ein verschachteltes Wörterbuch als Ausgabeparameter verwenden, wird das Wörterbuch als eine einzelne Zeichenfolge behandelt (z. B. {"output1": "{\"nested_output1\":\"my-output\"}"}). Wenn Sie einen verschachtelten Wert angeben und später versuchen, darauf zu verweisen, wird ein Client-Fehler ausgegeben, der nicht erneut versucht werden kann.

Bei der Definition des Lambda Schritts inputs muss es sich um ein Wörterbuch mit Schlüssel-Wert-Paaren handeln. Jeder Wert des inputs Wörterbuchs muss ein primitiver Typ sein (Zeichenfolge, Ganzzahl oder Gleitkommazahl). Verschachtelte Objekte werden nicht unterstützt. Bleibt der Wert für inputs undefiniert, wird der Wert für None verwendet.

Der outputs Wert muss eine Liste von Schlüsseln sein. Diese Schlüssel beziehen sich auf ein Wörterbuch, das in der Ausgabe der Lambda Funktion definiert ist. Wie bei inputs müssen diese Schlüssel primitive Typen sein, und verschachtelte Objekte werden nicht unterstützt.

Timeout und Verhalten beim Stoppen

Die Lambda Klasse hat ein timeout Argument, das die maximale Zeit angibt, während der die Lambda-Funktion ausgeführt werden kann. Der Standardwert ist 120 Sekunden und der Höchstwert 10 Minuten. Wenn die Lambda-Funktion ausgeführt wird, wenn das Timeout erreicht ist, schlägt der Lambda-Schritt fehl. Die Lambda-Funktion wird jedoch weiterhin ausgeführt.

Ein Pipelineprozess kann nicht gestoppt werden, während ein Lambda-Schritt ausgeführt wird, da die durch den Lambda-Schritt aufgerufene Lambda-Funktion nicht gestoppt werden kann. Wenn Sie versuchen, den Prozess zu beenden, während die Lambda-Funktion ausgeführt wird, wartet die Pipeline, bis die Lambda-Funktion beendet ist oder bis das Timeout erreicht ist, je nachdem, was zuerst eintritt, und stoppt dann. Wenn die Lambda-Funktion beendet wird, lautet der Status des Pipeline-Prozesses Stopped. Wenn die Zeitüberschreitung erreicht ist, lautet der Status des Pipeline-Prozesses Failed.

ClarifyCheck Schritt

Sie können diesen ClarifyCheck Schritt verwenden, um die Ausgangsabweichung anhand früherer Basislinien zu überprüfen, um die Verzerrungsanalyse und die Erklärbarkeit des Modells zu verbessern. Mit der model.register() Methode können Sie dann Ihre Baselines erstellen und registrieren und die Ausgabe dieser Methode mit Modell Schritt an step_args übergeben. Diese Baselines für die Abweichungsprüfung können von Amazon SageMaker Model Monitor für Ihre Modellendpunkte verwendet werden, sodass Sie keinen separaten Baseline-Vorschlag machen müssen. Bei diesem ClarifyCheck Schritt können auch Basiswerte für die Driftprüfung aus der Modellregistrierung abgerufen werden. Der ClarifyCheck Schritt nutzt den vorgefertigten Amazon SageMaker Clarify-Container, der eine Reihe von Modellüberwachungsfunktionen bietet, einschließlich des Vorschlags für Einschränkungen und der Validierung von Einschränkungen anhand einer bestimmten Baseline. Weitere Informationen finden Sie unter Erste Schritte mit einem SageMaker Clarify-Container.

Konfigurieren des ClarifyCheck Schritts

Sie können den ClarifyCheck Schritt so konfigurieren, dass bei jeder Verwendung in einer Pipeline nur einer der folgenden Prüftypen durchgeführt wird.

  • Prüfung auf Datenverzerrung

  • Überprüfung der Modellverzerrung

  • Überprüfung der Erklärbarkeit des Modells

Dazu setzen Sie den clarify_check_config Parameter mit einem der folgenden Prüftypwerte:

  • DataBiasCheckConfig

  • ModelBiasCheckConfig

  • ModelExplainabilityCheckConfig

Der ClarifyCheck Schritt startet einen Verarbeitungsauftrag, der den SageMaker vorgefertigten Clarify-Container ausführt und spezielle Konfigurationen für die Prüfung und den Verarbeitungsauftrag erfordert. ClarifyCheckConfig und CheckJobConfig sind Hilfsfunktionen für diese Konfigurationen, die darauf abgestimmt sind, wie der SageMaker Clarify-Verarbeitungsauftrag zur Überprüfung von Modellverzerrungen, Datenverzerrungen oder Modellerklärbarkeit berechnet. Weitere Informationen finden Sie unter Ausführung von SageMaker Clarify-Verarbeitungsaufträgen für Bias-Analyse und Erklärbarkeit.

Steuerung des Schrittverhaltens bei der Drift-Prüfung

Für diesen ClarifyCheck Schritt sind die folgenden zwei booleschen Flags erforderlich, um sein Verhalten zu steuern:

  • skip_check: Dieser Parameter gibt an, ob die Driftprüfung gegenüber der vorherigen Basislinie übersprungen wurde oder nicht. Wenn False auf gesetzt ist, muss die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein.

  • register_new_baseline: Dieser Parameter gibt an, ob über die Schritteigenschaft BaselineUsedForDriftCheckConstraints auf eine neu berechnete Basislinie zugegriffen werden kann. Wenn False auf gesetzt ist, muss auch die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein. Darauf kann über die BaselineUsedForDriftCheckConstraints Eigenschaft zugegriffen werden.

Weitere Informationen finden Sie unter Basisberechnung, Abweichungserkennung und Lebenszyklus mit den QualityCheck Schritten ClarifyCheck und in Amazon SageMaker Model Building Pipelines.

Arbeiten mit Baselines

Sie können optional model_package_group_name angeben, um die vorhandene Baseline zu finden, und der Schritt ClarifyCheck zieht das DriftCheckBaselines auf das letzte genehmigte Modellpaket in der Modellpaketgruppe. Oder Sie können über den supplied_baseline_constraints Parameter eine vorherige Basislinie angeben. Wenn Sie sowohl model_package_group_name als auch supplied_baseline_constraints angeben, verwendet der Schritt ClarifyCheck die durch den supplied_baseline_constraints Parameter angegebene Basislinie.

Weitere Informationen zur Verwendung der ClarifyCheck Schrittanforderungen finden Sie unter sagemaker.workflow.steps.ClarifyCheckStep im Amazon SageMaker SageMaker SDK for Python . Ein Amazon SageMaker Studio Classic-Notebook, das zeigt, wie Schritt ClarifyCheck in SageMaker Pipelines verwendet wird, finden Sie unter sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Beispiel Erstellen eines ClarifyCheck-Schrittes zur Prüfung der Datenverzerrung
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_bias_data_config = DataConfig( s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']), label=0, dataset_type="text/csv", s3_analysis_config_output_path=data_bias_analysis_cfg_output_path, ) data_bias_config = BiasConfig( label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]] ) data_bias_check_config = DataBiasCheckConfig( data_config=data_bias_data_config, data_bias_config=data_bias_config, )h data_bias_check_step = ClarifyCheckStep( name="DataBiasCheckStep", clarify_check_config=data_bias_check_config, check_job_config=check_job_config, skip_check=False, register_new_baseline=False supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json", model_package_group_name="MyModelPackageGroup" )

QualityCheck Schritt

Mit dem Schritt QualityCheck können Sie Baseline-Vorschläge und Drift-Checks gegen eine frühere Baseline für die Datenqualität oder die Modellqualität in einer Pipeline durchführen. Sie können dann Ihre Baselines mit der model.register()-Methode erzeugen und registrieren und die Ausgabe dieser Methode mit Modell Schritt an step_args übergeben. Model Monitor kann diese Basislinien für die Drift-Prüfung Ihrer Modellendpunkte verwenden, sodass Sie einen Basisvorschlag nicht separat erstellen müssen. Bei diesem QualityCheck Schritt können auch Basiswerte für die Driftprüfung aus der Modellregistrierung abgerufen werden. Der QualityCheck Schritt nutzt den vorgefertigten Container von Amazon SageMaker Model Monitor, der über eine Reihe von Modellüberwachungsfunktionen verfügt, einschließlich Einschränkungsvorschlag, Statistikgenerierung und Einschränkungsvalidierung anhand einer Baseline. Weitere Informationen finden Sie unter Vorgefertigter Container von Amazon SageMaker Model Monitor.

Konfigurieren des QualityCheck Schritts

Sie können den QualityCheck Schritt so konfigurieren, dass bei jeder Verwendung in einer Pipeline nur einer der folgenden Prüftypen durchgeführt wird.

  • Überprüfung der Datenqualität

  • Modellqualitätsprüfung

Dazu setzen Sie den quality_check_config Parameter mit einem der folgenden Prüftypwerte:

  • DataQualityCheckConfig

  • ModelQualityCheckConfig

In diesem QualityCheck Schritt wird ein Verarbeitungsauftrag gestartet, der den vorgefertigten Container von Model Monitor ausführt und spezielle Konfigurationen für die Prüfung und den Verarbeitungsauftrag erfordert. Bei den QualityCheckConfig und CheckJobConfig handelt es sich um Hilfsfunktionen für diese Konfigurationen, die darauf abgestimmt sind, wie Model Monitor eine Grundlage für die Überwachung der Modell- oder Datenqualität erstellt. Weitere Informationen zu den Basisvorschlägen von Model Monitor finden Sie unter Erstellen einer Baseline undErstellen Sie eine Basislinie für die Modellqualität.

Steuern des Schrittverhaltens bei der Drift-Prüfung

Für diesen QualityCheck Schritt sind die folgenden zwei booleschen Flags erforderlich, um sein Verhalten zu steuern:

  • skip_check: Dieser Parameter gibt an, ob die Driftprüfung gegenüber der vorherigen Basislinie übersprungen wurde oder nicht. Wenn False auf gesetzt ist, muss die vorherige Basislinie des konfigurierten Prüftyps verfügbar sein.

  • register_new_baseline: Dieser Parameter gibt an, ob auf eine neu berechnete Basislinie über die Schritteigenschaften BaselineUsedForDriftCheckConstraints und BaselineUsedForDriftCheckStatistics. Ist sie auf False eingestellt, muss auch die vorherige Baseline der konfigurierten Prüfart verfügbar sein. Auf diese kann über die Eigenschaften BaselineUsedForDriftCheckConstraints und BaselineUsedForDriftCheckStatistics zugegriffen werden.

Weitere Informationen finden Sie unter Basisberechnung, Abweichungserkennung und Lebenszyklus mit den QualityCheck Schritten ClarifyCheck und in Amazon SageMaker Model Building Pipelines.

Arbeiten mit Baselines

Sie können eine vorherige Basislinie direkt über die Parameter supplied_baseline_statistics und supplied_baseline_constraints angeben, oder Sie können einfach das model_package_group_name angeben und der QualityCheck Schritt ruft das DriftCheckBaselines auf dem letzten genehmigten Modellpaket in der Modellpaketgruppe ab. Wenn Sie, das model_package_group_name, das supplied_baseline_constraints und supplied_baseline_statistics angeben, verwendet der QualityCheck Schritt die Baseline, die durch supplied_baseline_constraints und supplied_baseline_statistics für den Prüftyp des QualityCheck Schritts, den Sie gerade ausführen, festgelegt wurde.

Weitere Informationen zur Verwendung der QualityCheck Schrittanforderungen finden Sie unter sagemaker.workflow.steps.QualityCheckStep im Amazon SageMaker SageMaker SDK for Python . Ein Amazon SageMaker Studio Classic-Notebook, das zeigt, wie Schritt QualityCheck in SageMaker Pipelines verwendet wird, finden Sie unter sagemaker-pipeline-model-monitor-clarify-steps.ipynb.

Beispiel Erstellen eines QualityCheck-Schrittes zur Prüfung der Datenqualität
from sagemaker.workflow.check_job_config import CheckJobConfig from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep from sagemaker.workflow.execution_variables import ExecutionVariables check_job_config = CheckJobConfig( role=role, instance_count=1, instance_type="ml.c5.xlarge", volume_size_in_gb=120, sagemaker_session=sagemaker_session, ) data_quality_check_config = DataQualityCheckConfig( baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"), output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep']) ) data_quality_check_step = QualityCheckStep( name="DataQualityCheckStep", skip_check=False, register_new_baseline=False, quality_check_config=data_quality_check_config, check_job_config=check_job_config, supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json", supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json", model_package_group_name="MyModelPackageGroup" )

EMR-Schritt

Sie können den EMR-Schritt Amazon SageMaker Model Building Pipelines verwenden, um Amazon-EMR-Schritte auf einem laufenden Amazon-EMR-Cluster zu verarbeiten, oder die Pipeline einen Amazon-EMR-Cluster für Sie erstellen und verwalten lassen. Weitere Informationen über Amazon EMR finden Sie unter Erste Schritte mit Amazon EMR.

Der EMR-Schritt erfordert, dass EMRStepConfig den Speicherort der JAR-Datei, die vom Amazon EMR-Cluster verwendet werden soll, und alle zu übergebenden Argumente enthält. Sie geben auch die Amazon EMR-Cluster-ID an, wenn Sie den Schritt auf einem laufenden EMR-Cluster ausführen möchten, oder die Cluster-Konfiguration, wenn der EMR-Schritt auf einem Cluster ausgeführt werden soll, den er für Sie erstellt, verwaltet und beendet. Die folgenden Abschnitte enthalten Beispiele und Links zu Beispiel-Notebooks, die beide Methoden demonstrieren.

Anmerkung
  • EMR-Schritte erfordern, dass die an Ihre Pipeline übergebene Rolle über zusätzliche Berechtigungen verfügt. Sie sollten die AWSverwaltete Richtlinie: AmazonSageMakerPipelinesIntegrations an Ihre Pipeline-Rolle anhängen oder sicherstellen, dass die Rolle die in dieser Richtlinie enthaltenen Berechtigungen umfasst.

  • EMR Step wird weder auf EMR Serverless noch auf Amazon EMR on EKS unterstützt.

  • Wenn Sie einen EMR-Schritt auf einem laufenden Cluster verarbeiten, können Sie nur einen Cluster verwenden, der sich in einem der folgenden Zustände befindet: STARTING, BOOTSTRAPPING, RUNNING, oder WAITING.

  • Wenn Sie EMR-Schritte in einem laufenden Cluster verarbeiten, können Sie maximal 256 EMR-Schritte in einem PENDING Status auf einem EMR-Cluster haben. EMR-Schritte, die über diesen Grenzwert hinaus eingereicht werden, führen zu einem Fehler bei der Pipeline-Ausführung. Sie können auch Richtlinie für Pipeline-Schritte erneut versuchen verwenden.

  • Sie können entweder Cluster-ID oder Cluster-Konfiguration angeben, aber nicht beides.

  • Der EMR-Schritt basiert darauf EventBridge , dass Amazon Änderungen im EMR-Schritt oder Cluster-Status überwacht. Wenn Sie Ihren Amazon EMR-Auftrag auf einem laufenden Cluster verarbeiten, verwendet der EMR-Schritt die SageMakerPipelineExecutionEMRStepStatusUpdateRule Regel zur Überwachung des EMR-Schrittstatus. Wenn Sie Ihren Auftrag auf einem Cluster verarbeiten, den der EMR-Schritt für Sie erstellt, verwendet der Schritt die SageMakerPipelineExecutionEMRClusterStatusRule Regel, um Änderungen im Clusterstatus zu überwachen. Wenn Sie eine dieser EventBridge Regeln in Ihrem AWS Konto sehen, löschen Sie sie nicht, andernfalls wird Ihr EMR-Schritt möglicherweise nicht abgeschlossen.

Starten Sie einen neuen Auftrag auf einem laufenden Amazon EMR-Cluster

Wenn Sie einen neuen Auftrag auf einem laufenden Amazon EMR-Cluster starten möchten, übergeben Sie die Cluster-ID als Zeichenfolge an das cluster_id Argument von EMRStep. Das folgende Beispiel veranschaulicht diese Vorgehensweise.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) step_emr = EMRStep ( name="EMRSampleStep", # required cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster step_config=emr_config, # required display_name="My EMR Step", description="Pipeline step to execute EMR job" )

Ein Beispiel-Notebook, das Sie durch ein vollständiges Beispiel führt, finden Sie unter SageMaker Pipelines EMR Step with Running EMR Cluster.

Starten Sie einen neuen Auftrag in einem neuen Amazon EMR-Cluster

Wenn Sie einen neuen Auftrag auf einem neuen Cluster starten möchten, den für Sie EMRStep erstellt, geben Sie Ihre Clusterkonfiguration als Wörterbuch mit derselben Struktur wie eine -RunJobFlowAnforderung an. Nehmen Sie jedoch nicht die folgenden Felder in Ihre Clusterkonfiguration auf:

  • [Name]

  • [Steps]

  • [AutoTerminationPolicy]

  • [Instances][KeepJobFlowAliveWhenNoSteps]

  • [Instances][TerminationProtected]

Alle anderen RunJobFlow Argumente können in Ihrer Clusterkonfiguration verwendet werden. Weitere Informationen zur Anforderungssyntax finden Sie unter RunJobFlow.

Im folgenden Beispiel wird eine Clusterkonfiguration an eine EMR-Schrittdefinition übergeben, die den Schritt auffordert, einen neuen Auftrag auf einem neuen EMR-Cluster zu starten. Die EMR-Clusterkonfiguration in diesem Beispiel umfasst Spezifikationen für primäre und zentrale EMR-Clusterknoten. Weitere Informationen zu Amazon EMR-Knotentypen finden Sie unter Grundlegendes zu Knotentypen: Primär-, Kern- und Aufgabenknoten.

from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig emr_step_config = EMRStepConfig( jar="jar-location", # required, path to jar file used args=["--verbose", "--force"], # optional list of arguments to pass to the jar main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest properties=[ # optional list of Java properties that are set when the step runs { "key": "mapred.tasktracker.map.tasks.maximum", "value": "2" }, { "key": "mapreduce.map.sort.spill.percent", "value": "0.90" }, { "key": "mapreduce.tasktracker.reduce.tasks.maximum", "value": "5" } ] ) # include your cluster configuration as a dictionary emr_cluster_config = { "Applications": [ { "Name": "Spark", } ], "Instances":{ "InstanceGroups":[ { "InstanceRole": "MASTER", "InstanceCount": 1, "InstanceType": "m5.2xlarge" }, { "InstanceRole": "CORE", "InstanceCount": 2, "InstanceType": "m5.2xlarge" } ] }, "BootstrapActions":[], "ReleaseLabel": "emr-6.6.0", "JobFlowRole": "job-flow-role", "ServiceRole": "service-role" } emr_step = EMRStep( name="emr-step", cluster_id=None, display_name="emr_step", description="MyEMRStepDescription", step_config=emr_step_config, cluster_config=emr_cluster_config )

Ein Beispiel-Notebook, das Sie durch ein vollständiges Beispiel führt, finden Sie unter SageMaker Pipelines EMR Step with Cluster Lifecycle Management.

Schritt „Notebook-Auftrag“

Verwenden Sie eine NotebookJobStep, um Ihren SageMaker Notebook-Auftrag nicht interaktiv als Pipeline-Schritt auszuführen. Weitere Informationen zu SageMaker Notebook-Aufträgen finden Sie unter SageMaker Notebook-Aufträge.

Ein NotebookJobStep erfordert mindestens ein Eingabe-Notebook, einen Image-URI und einen Kernel-Namen. Weitere Informationen zu den Schrittanforderungen für Notebook-Aufträge und anderen Parametern, die Sie festlegen können, um Ihren Schritt anzupassen, finden Sie unter sagemaker.workflow.steps.NotebookJobStep

Im folgenden Beispiel werden Mindestargumente verwendet, um einen zu definierenNotebookJobStep.

from sagemaker.workflow.notebook_job_step import NotebookJobStep notebook_job_step = NotebookJobStep( input_notebook=input_notebook, image_uri=image_uri, kernel_name=kernel_name )

Ihr NotebookJobStep Pipeline-Schritt wird als SageMaker Notebook-Auftrag behandelt, sodass Sie den Ausführungsstatus im Studio Classic UI-Notebook-Auftrags-Dashboard verfolgen können, wenn Sie bestimmte Tags mit dem tags -Argument einschließen. Weitere Informationen zum Einschließen von Tags finden Sie unter Anzeigen Ihrer Notebook-Aufträge im Studio-UI-Dashboard.

Wenn Sie Ihren Notebook-Auftrag mit dem SageMaker Python SDK planen, können Sie außerdem nur bestimmte Bilder angeben, um Ihren Notebook-Auftrag auszuführen. Weitere Informationen finden Sie unter Image-Einschränkungen für SageMaker Python-SDK-Notebook-Aufträge.

Schritt fehlschlagen

Sie verwenden eine FailStep, um eine Ausführung von Amazon SageMaker Model Building Pipelines zu stoppen, wenn eine gewünschte Bedingung oder ein gewünschter Status nicht erreicht wird, und um die Ausführung dieser Pipeline als fehlgeschlagen zu markieren. In der FailStep können Sie auch eine benutzerdefinierte Fehlermeldung eingeben, die die Ursache für den Ausführungsfehler der Pipeline angibt.

Anmerkung

Wenn ein FailStep und andere Pipeline-Schritte gleichzeitig ausgeführt werden, wird die Pipeline erst beendet, wenn alle gleichzeitigen Schritte abgeschlossen sind.

Einschränkungen bei der Verwendung FailStep

  • Sie können ein FailStep nicht in die DependsOn-Liste anderer Schritte aufnehmen. Weitere Informationen finden Sie unter Benutzerdefinierte Abhängigkeit zwischen Schritten.

  • Andere Schritte können sich nicht auf das FailStep beziehen. Dies ist immer der letzte Schritt bei der Ausführung einer Pipeline.

  • Sie können eine Pipeline-Ausführung, die mit einem FailStep endet, nicht wiederholen.

Sie können die FailStep ErrorMessage in Form einer statischen Textzeichenfolge erstellen. Alternativ können Sie auch Pipeline-Parameter, eine Join-Operation oder andere Schritteigenschaften verwenden, um eine aussagekräftigere Fehlermeldung zu erstellen.

Der folgende Beispielcodeausschnitt verwendet eine FailStep mit Pipelineparametern ErrorMessage konfigurierte Option und eine Join Operation.

from sagemaker.workflow.fail_step import FailStep from sagemaker.workflow.functions import Join from sagemaker.workflow.parameters import ParameterInteger mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5) step_fail = FailStep( name="AbaloneMSEFail", error_message=Join( on=" ", values=["Execution failed due to MSE >", mse_threshold_param] ), )

Eigenschaften des Schritts

Das properties Attribut wird verwendet, um Datenabhängigkeiten zwischen Schritten in der Pipeline hinzuzufügen. Diese Datenabhängigkeiten werden dann von SageMaker Pipelines verwendet, um die DAG aus der Pipeline-Definition zu erstellen. Diese Eigenschaften können als Platzhalterwerte referenziert werden und werden zur Laufzeit aufgelöst.

Das -propertiesAttribut eines SageMaker Pipelines-Schritts entspricht dem -Objekt, das von einem -DescribeAufruf für den entsprechenden SageMaker Auftragstyp zurückgegeben wird. Für jeden Auftragstyp gibt der Describe Aufruf das folgende Antwortobjekt zurück:

Informationen dazu, welche Eigenschaften während der Erstellung der Datenabhängigkeit für jeden Schritttyp referenzierbar sind, finden Sie unter Datenabhängigkeit – Eigenschaftsreferenz im Amazon SageMaker Python SDK.

Schrittparallelität

Wenn ein Schritt nicht von einem anderen Schritt abhängt, wird er sofort nach der Ausführung der Pipeline ausgeführt. Die parallel Ausführung zu vieler Pipeline-Schritte kann jedoch schnell die verfügbaren Ressourcen erschöpfen. Steuern Sie die Anzahl der gleichzeitigen Schritte für eine Pipeline-Ausführung mit ParallelismConfiguration.

Im folgenden Beispiel wird ParallelismConfiguration verwendet, um die Grenze für gleichzeitige Schritte auf fünf zu setzen.

pipeline.create( parallelism_config=ParallelismConfiguration(5), )

Datenabhängigkeit zwischen Schritten

Sie definieren die Struktur Ihrer DAG, indem Sie die Datenbeziehungen zwischen den Schritten angeben. Um Datenabhängigkeiten zwischen Schritten herzustellen, übergeben Sie die Eigenschaften eines Schritts als Eingabe an einen anderen Schritt in der Pipeline. Der Schritt, der die Eingabe empfängt, wird erst gestartet, nachdem der Schritt, der die Eingabe bereitstellt, abgeschlossen ist.

Eine Datenabhängigkeit verwendet die JsonPath Notation im folgenden Format. Dieses Format durchläuft die JSON-Eigenschaftendatei, was bedeutet, dass Sie so viele<property> Instanzen wie nötig anhängen können, um die gewünschte verschachtelte Eigenschaft in der Datei zu erreichen. Weitere Informationen zur JsonPath Notation finden Sie im JsonPath Repo .

<step_name>.properties.<property>.<property>

Im Folgenden wird gezeigt, wie ein Amazon-S3-Bucket mithilfe der ProcessingOutputConfig Eigenschaft eines Verarbeitungsschritts angegeben wird.

step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri

Um die Datenabhängigkeit zu erstellen, übergeben Sie den Bucket wie folgt an einen Trainingsschritt.

from sagemaker.workflow.pipeline_context import PipelineSession sklearn_train = SKLearn(..., sagemaker_session=PipelineSession()) step_train = TrainingStep( name="CensusTrain", step_args=sklearn_train.fit(inputs=TrainingInput( s3_data=step_process.properties.ProcessingOutputConfig.Outputs[ "train_data"].S3Output.S3Uri )) )

Informationen dazu, welche Eigenschaften während der Erstellung der Datenabhängigkeit für jeden Schritttyp referenzierbar sind, finden Sie unter Datenabhängigkeit – Eigenschaftsreferenz im Amazon SageMaker Python SDK.

Benutzerdefinierte Abhängigkeit zwischen Schritten

Wenn Sie eine Datenabhängigkeit angeben, stellt SageMaker Pipelines die Datenverbindung zwischen den Schritten bereit. Alternativ kann ein Schritt auf die Daten aus einem vorherigen Schritt zugreifen, ohne SageMaker Pipelines direkt zu verwenden. In diesem Fall können Sie eine benutzerdefinierte Abhängigkeit erstellen, die SageMaker Pipelines anweist, einen Schritt erst zu starten, nachdem ein anderer Schritt abgeschlossen ist. Sie erstellen eine benutzerdefinierte Abhängigkeit, indem Sie DependsOn -Attribut eines Schritts angeben.

Im Folgenden wird beispielsweise ein Schritt C definiert, der erst beginnt, wenn sowohl der Schritt A als auch der Schritt B abgeschlossen sind.

{ 'Steps': [ {'Name':'A', ...}, {'Name':'B', ...}, {'Name':'C', 'DependsOn': ['A', 'B']} ] }

SageMaker Pipelines löst eine Validierungsausnahme aus, wenn die Abhängigkeit eine zyklische Abhängigkeit erzeugen würde.

Im folgenden Beispiel wird ein Trainingsschritt erstellt, der beginnt, nachdem ein Verarbeitungsschritt abgeschlossen ist.

processing_step = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step])

Im folgenden Beispiel wird ein Trainingsschritt erstellt, der erst beginnt, wenn zwei verschiedene Verarbeitungsschritte abgeschlossen sind.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep(...) training_step.add_depends_on([processing_step_1, processing_step_2])

Im Folgenden wird eine alternative Methode zum Erstellen der benutzerdefinierten Abhängigkeit beschrieben.

training_step.add_depends_on([processing_step_1]) training_step.add_depends_on([processing_step_2])

Im folgenden Beispiel wird ein Trainingsschritt erstellt, der Eingaben von einem Verarbeitungsschritt empfängt und darauf wartet, dass ein anderer Verarbeitungsschritt abgeschlossen ist.

processing_step_1 = ProcessingStep(...) processing_step_2 = ProcessingStep(...) training_step = TrainingStep( ..., inputs=TrainingInput( s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[ "train_data" ].S3Output.S3Uri ) training_step.add_depends_on([processing_step_2])

Das folgende Beispiel zeigt, wie eine Stringliste der benutzerdefinierten Abhängigkeiten eines Schritts abgerufen wird.

custom_dependencies = training_step.depends_on

Verwenden Sie ein benutzerdefiniertes Bild in einem Schritt

Sie können jedes der verfügbaren SageMakerDeep Learning Container-Images verwenden, wenn Sie einen Schritt in Ihrer Pipeline erstellen.

Sie können auch Ihren eigenen -Container mit Pipeline-Schritten verwenden. Da Sie kein Image aus Amazon SageMaker Studio Classic erstellen können, müssen Sie Ihr Image mit einer anderen Methode erstellen, bevor Sie es mit Amazon SageMaker Model Building Pipelines verwenden.

Um bei der Erstellung der Schritte für Ihre Pipeline Ihren eigenen Container zu verwenden, fügen Sie den Bild-URI in die Estimator-Definition ein. Weitere Informationen zur Verwendung Ihres eigenen Containers mit SageMakerfinden Sie unter Verwenden von Docker-Containern mit SageMaker.