Crea una canalización con funciones decoradas @step - Amazon SageMaker

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Crea una canalización con funciones decoradas @step

Puede crear una canalización convirtiendo las funciones de Python en pasos de canalización mediante el @step decorador, creando dependencias entre esas funciones para crear un gráfico de canalización (o gráfico acíclico dirigido (DAG)) y pasando los nodos de hoja de ese gráfico como una lista de pasos a la canalización. En las siguientes secciones se explica este procedimiento en detalle con ejemplos.

Convierte una función en un paso

Para crear un paso con el @step decorador, anota la función con. @step En el siguiente ejemplo, se muestra una función @step decorada con símbolos que preprocesa los datos.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

Al invocar una función @step decorada con símbolos, SageMaker devuelve una DelayedReturn instancia en lugar de ejecutar la función. Una DelayedReturn instancia es un proxy del retorno real de esa función. La DelayedReturn instancia se puede pasar a otra función como argumento o directamente a una instancia de canalización como paso. Para obtener información sobre la DelayedReturn clase, consulta sagemaker.workflow.function_step. DelayedReturn.

Al crear una dependencia entre dos pasos, se crea una conexión entre los pasos del gráfico de canalización. En las siguientes secciones, se presentan varias formas de crear una dependencia entre los pasos de la canalización.

Al pasar la DelayedReturn salida de una función como entrada a otra función, se crea automáticamente una dependencia de datos en el DAG de canalización. En el siguiente ejemplo, al pasar la DelayedReturn salida de la preprocess función a la train función se crea una dependencia entre preprocess ytrain.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

El ejemplo anterior define una función de entrenamiento que está decorada con@step. Cuando se invoca esta función, recibe como entrada el DelayedReturn resultado del paso de la canalización de preprocesamiento. Al invocar la función de entrenamiento, se devuelve otra DelayedReturn instancia. Esta instancia contiene la información sobre todos los pasos anteriores definidos en esa función (es decir, el preprocess paso de este ejemplo) que forman el DAG de canalización.

En el ejemplo anterior, la preprocess función devuelve un único valor. Para obtener información sobre tipos de retorno más complejos, como listas o tuplas, consulte. Limitaciones

En el ejemplo anterior, la train función recibió el DelayedReturn resultado de preprocess y creó una dependencia. Si desea definir la dependencia de forma explícita sin pasar el resultado del paso anterior, utilice la add_depends_on función con el paso. Puedes usar la get_step() función para recuperar el paso subyacente de su DelayedReturn instancia y, a continuación, llamar a add_depends_on _on con la dependencia como entrada. Para ver la definición de la get_step() función, consulta sagemaker.workflow.step_outputs.get_step. El siguiente ejemplo muestra cómo crear una dependencia entre y usar y. preprocess train get_step() add_depends_on()

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

Puede crear una canalización que incluya un @step escalón decorado y un escalón de canalización tradicional y pasar datos entre ellos. Por ejemplo, puede utilizarlos ProcessingStep para procesar los datos y pasar su resultado a la función @step de entrenamiento decorada con letras. En el siguiente ejemplo, un paso @step de entrenamiento decorado hace referencia al resultado de un paso de procesamiento.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

ConditionStepUtilízalo con peldaños @step decorados

SageMaker Pipelines apoya a una ConditionStep clase que evalúa los resultados de los pasos anteriores para decidir qué medidas tomar en el proceso. También se puede utilizar ConditionStep con un @step escalón decorado. Para usar el resultado de cualquier paso @step decorado con un símboloConditionStep, introduce el resultado de ese paso como argumento para. ConditionStep En el siguiente ejemplo, el paso de condición recibe el resultado del paso de evaluación del modelo @step decorado con bordes.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

Defina una canalización utilizando el DelayedReturn resultado de los pasos

Una canalización se define de la misma manera independientemente de si se utiliza o no un @step decorador. Cuando pasas una DelayedReturn instancia a tu canalización, no necesitas pasar una lista completa de pasos para crear la canalización. El SDK deduce automáticamente los pasos anteriores en función de las dependencias que definas. Todos los pasos anteriores de los Step objetos que has pasado a la canalización o a los DelayedReturn objetos se incluyen en el gráfico de canalización. En el siguiente ejemplo, la canalización recibe el DelayedReturn objeto de la train función. SageMaker añade el preprocess paso, como paso anteriortrain, al gráfico de canalización.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

Si no hay datos o dependencias personalizadas entre los pasos y ejecutas varios pasos en paralelo, el gráfico de canalización tiene más de un nodo hoja. Transfiere todos estos nodos de hoja de una lista al steps argumento de la definición de tu canalización, como se muestra en el siguiente ejemplo:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

Cuando se ejecuta la canalización, ambos pasos se ejecutan en paralelo.

Solo se pasan los nodos de hoja del gráfico a la canalización, ya que los nodos de hoja contienen información sobre todos los pasos anteriores definidos mediante datos o dependencias personalizadas. Al compilar la canalización, SageMaker también deduce todos los pasos subsiguientes que forman el gráfico de canalización y añade cada uno de ellos como un paso independiente a la canalización.

Creación de una canalización

Crea una canalización mediante una llamadapipeline.create(), como se muestra en el siguiente fragmento. Para obtener más información, consulte create() SageMaker.workflow.Pipeline.Pipeline.create.

role = "pipeline-role" pipeline.create(role)

Cuando llamas, compila todos los pasos definidos como parte de la instancia de pipeline.create() SageMaker canalización. SageMaker carga la función serializada, los argumentos y todos los demás artefactos relacionados con los pasos en Amazon S3.

Los datos residen en el depósito de S3 de acuerdo con la siguiente estructura:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_urise define en el archivo de SageMaker configuración y se aplica a toda la canalización. Si no está definido, se utiliza el SageMaker depósito predeterminado.

nota

Cada vez SageMaker que compila una canalización, SageMaker guarda las funciones, los argumentos y las dependencias serializados de los pasos en una carpeta con la hora actual. Esto ocurre cada vez que ejecutas, o. pipeline.create() pipeline.update() pipeline.upsert() pipeline.definition()