Erstellen Sie eine Pipeline mit @step -dekorierten Funktionen - Amazon SageMaker

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Erstellen Sie eine Pipeline mit @step -dekorierten Funktionen

Sie können eine Pipeline erstellen, indem Sie Python-Funktionen mithilfe des @step Decorators in Pipeline-Schritte konvertieren, Abhängigkeiten zwischen diesen Funktionen erstellen, um einen Pipeline-Graphen (oder einen gerichteten azyklischen Graphen (DAG)) zu erstellen, und die Blattknoten dieses Graphen als Liste von Schritten an die Pipeline übergeben. In den folgenden Abschnitten wird dieses Verfahren anhand von Beispielen ausführlich erläutert.

Konvertiert eine Funktion in einen Schritt

Um einen Schritt mit dem @step Decorator zu erstellen, kommentieren Sie die Funktion mit. @step Das folgende Beispiel zeigt eine @step mit -dekorierte Funktion, die die Daten vorverarbeitet.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

Wenn Sie eine @step mit -dekorierte Funktion aufrufen, wird eine DelayedReturn Instanz SageMaker zurückgegeben, anstatt die Funktion auszuführen. Eine DelayedReturn Instanz ist ein Proxy für die tatsächliche Rückgabe dieser Funktion. Die DelayedReturn Instanz kann als Argument an eine andere Funktion oder als Schritt direkt an eine Pipeline-Instanz übergeben werden. Informationen zur DelayedReturn Klasse finden Sie unter sagemaker.workflow.function_step. DelayedReturn.

Wenn Sie eine Abhängigkeit zwischen zwei Schritten erstellen, stellen Sie eine Verbindung zwischen den Schritten in Ihrem Pipeline-Diagramm her. In den folgenden Abschnitten werden mehrere Möglichkeiten vorgestellt, wie Sie eine Abhängigkeit zwischen Ihren Pipeline-Schritten herstellen können.

Wenn Sie die DelayedReturn Ausgabe einer Funktion als Eingabe an eine andere Funktion übergeben, entsteht automatisch eine Datenabhängigkeit in der PipelineDAG. Im folgenden Beispiel erzeugt die Übergabe der DelayedReturn Ausgabe der preprocess Funktion an die train Funktion eine Abhängigkeit zwischen preprocess undtrain.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

Das vorherige Beispiel definiert eine Trainingsfunktion, die mit ausgestattet ist@step. Wenn diese Funktion aufgerufen wird, erhält sie die DelayedReturn Ausgabe des Vorverarbeitungs-Pipeline-Schritts als Eingabe. Beim Aufrufen der Trainingsfunktion wird eine weitere Instanz zurückgegeben. DelayedReturn Diese Instanz enthält die Informationen über alle vorherigen Schritte, die in dieser Funktion definiert wurden (d. h. der preprocess Schritt in diesem Beispiel), die die Pipeline DAG bilden.

Im vorherigen Beispiel gibt die preprocess Funktion einen einzelnen Wert zurück. Für komplexere Rückgabetypen wie Listen oder Tupel siehe. Einschränkungen

Im vorherigen Beispiel hat die train Funktion die DelayedReturn Ausgabe von empfangen preprocess und eine Abhängigkeit erstellt. Wenn Sie die Abhängigkeit explizit definieren möchten, ohne die Ausgabe des vorherigen Schritts zu übergeben, verwenden Sie die add_depends_on Funktion mit dem Schritt. Sie können die get_step() Funktion verwenden, um den zugrunde liegenden Schritt aus seiner DelayedReturn Instanz abzurufen, und dann add_depends_on _on mit der Abhängigkeit als Eingabe aufrufen. Die get_step() Funktionsdefinition finden Sie unter sagemaker.workflow.step_outputs.get_step. Das folgende Beispiel zeigt Ihnen, wie Sie eine Abhängigkeit zwischen und mithilfe von und erstellen. preprocess train get_step() add_depends_on()

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

Sie können eine Pipeline erstellen, die einen Schritt mit einer @step Markierung und einen herkömmlichen Pipeline-Schritt umfasst und Daten zwischen diesen weiterleitet. Sie können sie beispielsweise verwenden, um die Daten ProcessingStep zu verarbeiten und das Ergebnis an die Trainingsfunktion @step mit -dekoriertem Dekor weiterzuleiten. Im folgenden Beispiel verweist ein @step mit -dekorierter Trainingsschritt auf die Ausgabe eines Verarbeitungsschritts.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

Wird ConditionStep zusammen mit Schritten verwendet, die mit @step -verziert sind

Pipelines unterstützt eine ConditionStep Klasse, die die Ergebnisse der vorherigen Schritte auswertet, um zu entscheiden, welche Maßnahmen in der Pipeline ergriffen werden sollen. Sie können es auch ConditionStep mit einem Schritt verwenden, der mit @step einem Symbol versehen ist. Um die Ausgabe eines beliebigen Schritts mit @step -dekorierten Zeichen zu verwendenConditionStep, geben Sie die Ausgabe dieses Schritts als Argument für ein. ConditionStep Im folgenden Beispiel erhält der Bedingungsschritt die Ausgabe des Bewertungsschritts für das @step mit -dekorierte Modell.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

Definieren Sie eine Pipeline anhand der DelayedReturn Ausgabe von Schritten

Sie definieren eine Pipeline auf die gleiche Weise, unabhängig davon, ob Sie einen @step Decorator verwenden oder nicht. Wenn Sie eine DelayedReturn Instanz an Ihre Pipeline übergeben, müssen Sie keine vollständige Liste der Schritte zum Erstellen der Pipeline übergeben. Die SDK leitet automatisch die vorherigen Schritte auf der Grundlage der von Ihnen definierten Abhängigkeiten ab. Alle vorherigen Schritte der Step Objekte, die Sie an die Pipeline übergeben haben, oder der DelayedReturn Objekte, sind im Pipeline-Diagramm enthalten. Im folgenden Beispiel empfängt die Pipeline das DelayedReturn Objekt für die train Funktion. SageMaker fügt den preprocess Schritt als vorherigen Schritt von train zum Pipeline-Diagramm hinzu.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

Wenn zwischen den Schritten keine Daten oder benutzerdefinierten Abhängigkeiten bestehen und Sie mehrere Schritte parallel ausführen, hat das Pipeline-Diagramm mehr als einen Blattknoten. Übergeben Sie all diese Blattknoten in einer Liste an das steps Argument in Ihrer Pipeline-Definition, wie im folgenden Beispiel gezeigt:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

Wenn die Pipeline läuft, laufen beide Schritte parallel.

Sie übergeben nur die Blattknoten des Diagramms an die Pipeline, da die Blattknoten Informationen über alle vorherigen Schritte enthalten, die durch Daten oder benutzerdefinierte Abhängigkeiten definiert wurden. Beim Kompilieren der Pipeline wird SageMaker auch von allen nachfolgenden Schritten abgeleitet, die das Pipeline-Diagramm bilden, und jeder Schritt wird der Pipeline als separater Schritt hinzugefügt.

Erstellen Sie eine Pipeline

Erstellen Sie eine Pipeline durch Aufrufenpipeline.create(), wie im folgenden Codeausschnitt gezeigt. Einzelheiten dazu finden Sie unter create() SageMaker.Workflow.Pipeline.Pipeline.Create.

role = "pipeline-role" pipeline.create(role)

Kompiliert beim Aufrufen alle Schritte, die als Teil der Pipeline-Instanz pipeline.create() definiert SageMaker sind. SageMaker lädt die serialisierte Funktion, die Argumente und alle anderen schrittbezogenen Artefakte auf Amazon S3 hoch.

Die Daten befinden sich gemäß der folgenden Struktur im S3-Bucket:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_uriist in der SageMaker Konfigurationsdatei definiert und gilt für die gesamte Pipeline. Wenn nicht definiert, wird der SageMaker Standard-Bucket verwendet.

Anmerkung

Jedes Mal, wenn eine Pipeline SageMaker SageMaker kompiliert wird, werden die serialisierten Funktionen, Argumente und Abhängigkeiten der Schritte in einem Ordner gespeichert, der mit der aktuellen Uhrzeit versehen ist. Dies geschieht jedes Mal, wenn Sie, oder ausführen. pipeline.create() pipeline.update() pipeline.upsert() pipeline.definition()