Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Erstellen Sie eine Pipeline mit @step
-dekorierten Funktionen
Sie können eine Pipeline erstellen, indem Sie Python-Funktionen mithilfe des @step
Decorators in Pipeline-Schritte konvertieren, Abhängigkeiten zwischen diesen Funktionen erstellen, um einen Pipeline-Graphen (oder einen gerichteten azyklischen Graphen (DAG)) zu erstellen, und die Blattknoten dieses Graphen als Liste von Schritten an die Pipeline übergeben. In den folgenden Abschnitten wird dieses Verfahren anhand von Beispielen ausführlich erläutert.
Themen
Konvertiert eine Funktion in einen Schritt
Um einen Schritt mit dem @step
Decorator zu erstellen, kommentieren Sie die Funktion mit. @step
Das folgende Beispiel zeigt eine @step
mit -dekorierte Funktion, die die Daten vorverarbeitet.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)
Wenn Sie eine @step
mit -dekorierte Funktion aufrufen, wird eine DelayedReturn
Instanz SageMaker zurückgegeben, anstatt die Funktion auszuführen. Eine DelayedReturn
Instanz ist ein Proxy für die tatsächliche Rückgabe dieser Funktion. Die DelayedReturn
Instanz kann als Argument an eine andere Funktion oder als Schritt direkt an eine Pipeline-Instanz übergeben werden. Informationen zur DelayedReturn
Klasse finden Sie unter sagemaker.workflow.function_step. DelayedReturn
Erstellen Sie Abhängigkeiten zwischen den Schritten
Wenn Sie eine Abhängigkeit zwischen zwei Schritten erstellen, stellen Sie eine Verbindung zwischen den Schritten in Ihrem Pipeline-Diagramm her. In den folgenden Abschnitten werden mehrere Möglichkeiten vorgestellt, wie Sie eine Abhängigkeit zwischen Ihren Pipeline-Schritten herstellen können.
Datenabhängigkeiten durch Eingabeargumente
Wenn Sie die DelayedReturn
Ausgabe einer Funktion als Eingabe an eine andere Funktion übergeben, entsteht automatisch eine Datenabhängigkeit in der PipelineDAG. Im folgenden Beispiel erzeugt die Übergabe der DelayedReturn
Ausgabe der preprocess
Funktion an die train
Funktion eine Abhängigkeit zwischen preprocess
undtrain
.
from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)
Das vorherige Beispiel definiert eine Trainingsfunktion, die mit ausgestattet ist@step
. Wenn diese Funktion aufgerufen wird, erhält sie die DelayedReturn
Ausgabe des Vorverarbeitungs-Pipeline-Schritts als Eingabe. Beim Aufrufen der Trainingsfunktion wird eine weitere Instanz zurückgegeben. DelayedReturn
Diese Instanz enthält die Informationen über alle vorherigen Schritte, die in dieser Funktion definiert wurden (d. h. der preprocess
Schritt in diesem Beispiel), die die Pipeline DAG bilden.
Im vorherigen Beispiel gibt die preprocess
Funktion einen einzelnen Wert zurück. Für komplexere Rückgabetypen wie Listen oder Tupel siehe. Einschränkungen
Definieren Sie benutzerdefinierte Abhängigkeiten
Im vorherigen Beispiel hat die train
Funktion die DelayedReturn
Ausgabe von empfangen preprocess
und eine Abhängigkeit erstellt. Wenn Sie die Abhängigkeit explizit definieren möchten, ohne die Ausgabe des vorherigen Schritts zu übergeben, verwenden Sie die add_depends_on
Funktion mit dem Schritt. Sie können die get_step()
Funktion verwenden, um den zugrunde liegenden Schritt aus seiner DelayedReturn
Instanz abzurufen, und dann add_depends_on
_on mit der Abhängigkeit als Eingabe aufrufen. Die get_step()
Funktionsdefinition finden Sie unter sagemaker.workflow.step_outputs.get_steppreprocess
train
get_step()
add_depends_on()
from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])
Übergeben Sie Daten an und von einer @step
mit -dekorierten Funktion an einen herkömmlichen Pipeline-Schritt
Sie können eine Pipeline erstellen, die einen Schritt mit einer @step
Markierung und einen herkömmlichen Pipeline-Schritt umfasst und Daten zwischen diesen weiterleitet. Sie können sie beispielsweise verwenden, um die Daten ProcessingStep
zu verarbeiten und das Ergebnis an die Trainingsfunktion @step
mit -dekoriertem Dekor weiterzuleiten. Im folgenden Beispiel verweist ein @step
mit -dekorierter Trainingsschritt auf die Ausgabe eines Verarbeitungsschritts.
# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=
input_data
, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a
@step
-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )
Wird ConditionStep
zusammen mit Schritten verwendet, die mit @step
-verziert sind
Pipelines unterstützt eine ConditionStep
Klasse, die die Ergebnisse der vorherigen Schritte auswertet, um zu entscheiden, welche Maßnahmen in der Pipeline ergriffen werden sollen. Sie können es auch ConditionStep
mit einem Schritt verwenden, der mit @step
einem Symbol versehen ist. Um die Ausgabe eines beliebigen Schritts mit @step
-dekorierten Zeichen zu verwendenConditionStep
, geben Sie die Ausgabe dieses Schritts als Argument für ein. ConditionStep
Im folgenden Beispiel erhält der Bedingungsschritt die Ausgabe des Bewertungsschritts für das @step
mit -dekorierte Modell.
# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )
Definieren Sie eine Pipeline anhand der DelayedReturn
Ausgabe von Schritten
Sie definieren eine Pipeline auf die gleiche Weise, unabhängig davon, ob Sie einen @step
Decorator verwenden oder nicht. Wenn Sie eine DelayedReturn
Instanz an Ihre Pipeline übergeben, müssen Sie keine vollständige Liste der Schritte zum Erstellen der Pipeline übergeben. Die SDK leitet automatisch die vorherigen Schritte auf der Grundlage der von Ihnen definierten Abhängigkeiten ab. Alle vorherigen Schritte der Step
Objekte, die Sie an die Pipeline übergeben haben, oder der DelayedReturn
Objekte, sind im Pipeline-Diagramm enthalten. Im folgenden Beispiel empfängt die Pipeline das DelayedReturn
Objekt für die train
Funktion. SageMaker fügt den preprocess
Schritt als vorherigen Schritt von train
zum Pipeline-Diagramm hinzu.
from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_train_result], sagemaker_session=<sagemaker-session>
, )
Wenn zwischen den Schritten keine Daten oder benutzerdefinierten Abhängigkeiten bestehen und Sie mehrere Schritte parallel ausführen, hat das Pipeline-Diagramm mehr als einen Blattknoten. Übergeben Sie all diese Blattknoten in einer Liste an das steps
Argument in Ihrer Pipeline-Definition, wie im folgenden Beispiel gezeigt:
@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="
<pipeline-name>
", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session
, )
Wenn die Pipeline läuft, laufen beide Schritte parallel.
Sie übergeben nur die Blattknoten des Diagramms an die Pipeline, da die Blattknoten Informationen über alle vorherigen Schritte enthalten, die durch Daten oder benutzerdefinierte Abhängigkeiten definiert wurden. Beim Kompilieren der Pipeline wird SageMaker auch von allen nachfolgenden Schritten abgeleitet, die das Pipeline-Diagramm bilden, und jeder Schritt wird der Pipeline als separater Schritt hinzugefügt.
Erstellen Sie eine Pipeline
Erstellen Sie eine Pipeline durch Aufrufenpipeline.create()
, wie im folgenden Codeausschnitt gezeigt. Einzelheiten dazu finden Sie unter create()
SageMaker.Workflow.Pipeline.Pipeline.Create.
role = "
pipeline-role
" pipeline.create(role)
Kompiliert beim Aufrufen alle Schritte, die als Teil der Pipeline-Instanz pipeline.create()
definiert SageMaker sind. SageMaker lädt die serialisierte Funktion, die Argumente und alle anderen schrittbezogenen Artefakte auf Amazon S3 hoch.
Die Daten befinden sich gemäß der folgenden Struktur im S3-Bucket:
s3_root_uri/
pipeline_name
/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir)step_name
/timestamp
/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the stepexecution_id
/step_name
/ results # returned output from the serialized function including the model
s3_root_uri
ist in der SageMaker Konfigurationsdatei definiert und gilt für die gesamte Pipeline. Wenn nicht definiert, wird der SageMaker Standard-Bucket verwendet.
Anmerkung
Jedes Mal, wenn eine Pipeline SageMaker SageMaker kompiliert wird, werden die serialisierten Funktionen, Argumente und Abhängigkeiten der Schritte in einem Ordner gespeichert, der mit der aktuellen Uhrzeit versehen ist. Dies geschieht jedes Mal, wenn Sie, oder ausführen. pipeline.create()
pipeline.update()
pipeline.upsert()
pipeline.definition()