Crie um pipeline com funções @step decoradas - Amazon SageMaker

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie um pipeline com funções @step decoradas

Você pode criar um pipeline convertendo funções do Python em etapas do pipeline usando @step o decorador, criando dependências entre essas funções para criar um gráfico do pipeline (ou gráfico acíclico direcionado (DAG)) e passando os nós da folha desse gráfico como uma lista de etapas para o pipeline. As seções a seguir explicam esse procedimento em detalhes com exemplos.

Converter uma função em uma etapa

Para criar uma etapa usando o @step decorador, anote a função com. @step O exemplo a seguir mostra uma função @step -decorada que pré-processa os dados.

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe step_process_result = preprocess(raw_data)

Quando você invoca uma função @step -decorada, SageMaker retorna uma DelayedReturn instância em vez de executar a função. Uma DelayedReturn instância é um proxy para o retorno real dessa função. A DelayedReturn instância pode ser passada para outra função como argumento ou diretamente para uma instância do pipeline como uma etapa. Para obter informações sobre a DelayedReturn classe, consulte sagemaker.workflow.function_step. DelayedReturn.

Ao criar uma dependência entre duas etapas, você cria uma conexão entre as etapas no gráfico do pipeline. As seções a seguir apresentam várias maneiras de criar uma dependência entre as etapas do pipeline.

Passar a DelayedReturn saída de uma função como entrada para outra função cria automaticamente uma dependência de dados no pipeline DAG. No exemplo a seguir, passar a DelayedReturn saída da preprocess função para a train função cria uma dependência entre preprocess e. train

from sagemaker.workflow.function_step import step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... return procesed_dataframe @step def train(training_data): ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train(step_process_result)

O exemplo anterior define uma função de treinamento que é decorada com@step. Quando essa função é invocada, ela recebe a DelayedReturn saída da etapa do pipeline de pré-processamento como entrada. A invocação da função de treinamento retorna outra DelayedReturn instância. Essa instância contém as informações sobre todas as etapas anteriores definidas nessa função (ou seja, a preprocess etapa neste exemplo) que formam o pipeline DAG.

No exemplo anterior, a preprocess função retorna um único valor. Para tipos de retorno mais complexos, como listas ou tuplas, consulte. Limitações

No exemplo anterior, a train função recebeu a DelayedReturn saída de preprocess e criou uma dependência. Se você quiser definir a dependência explicitamente sem passar a saída da etapa anterior, use a add_depends_on função com a etapa. Você pode usar a get_step() função para recuperar a etapa subjacente de sua DelayedReturn instância e, em seguida, chamar add_depends_on _on com a dependência como entrada. Para ver a definição da get_step() função, consulte sagemaker.workflow.step_outputs.get_step. O exemplo a seguir mostra como criar uma dependência entre preprocess get_step() e train usando e. add_depends_on()

from sagemaker.workflow.step_outputs import get_step @step def preprocess(raw_data): df = pandas.read_csv(raw_data) ... processed_data = .. return s3.upload(processed_data) @step def train(): training_data = s3.download(....) ... return trained_model step_process_result = preprocess(raw_data) step_train_result = train() get_step(step_train_result).add_depends_on([step_process_result])

Você pode criar um pipeline que inclua uma etapa @step decorada e uma etapa tradicional do pipeline e transmita dados entre elas. Por exemplo, você pode usar ProcessingStep para processar os dados e passar o resultado para a função de treinamento @step -decorada. No exemplo a seguir, uma etapa @step de treinamento decorada faz referência à saída de uma etapa de processamento.

# Define processing step from sagemaker.sklearn.processing import SKLearnProcessor from sagemaker.processing import ProcessingInput, ProcessingOutput from sagemaker.workflow.steps import ProcessingStep sklearn_processor = SKLearnProcessor( framework_version='1.2-1', role='arn:aws:iam::123456789012:role/SagemakerExecutionRole', instance_type='ml.m5.large', instance_count='1', ) inputs = [ ProcessingInput(source=input_data, destination="/opt/ml/processing/input"), ] outputs = [ ProcessingOutput(output_name="train", source="/opt/ml/processing/train"), ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"), ProcessingOutput(output_name="test", source="/opt/ml/processing/test") ] process_step = ProcessingStep( name="MyProcessStep", step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'), )
# Define a @step-decorated train step which references the # output of a processing step @step def train(train_data_path, test_data_path): ... return trained_model step_train_result = train( process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri, process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri, )

Use ConditionStep com @step degraus decorados

SageMaker Os pipelines oferecem suporte a uma ConditionStep classe que avalia os resultados das etapas anteriores para decidir qual ação tomar no pipeline. Você também pode usar ConditionStep com um @step degrau decorado. Para usar a saída de qualquer etapa @step decorada comConditionStep, insira a saída dessa etapa como argumento paraConditionStep. No exemplo a seguir, a etapa de condição recebe a saída da etapa de avaliação do modelo @step -decorado.

# Define steps @step(name="evaluate") def evaluate_model(): # code to evaluate the model return { "rmse":rmse_value } @step(name="register") def register_model(): # code to register the model ...
# Define ConditionStep from sagemaker.workflow.condition_step import ConditionStep from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo from sagemaker.workflow.fail_step import FailStep conditionally_register = ConditionStep( name="conditional_register", conditions=[ ConditionGreaterThanOrEqualTo( # Output of the evaluate step must be json serializable left=evaluate_model()["rmse"], # right=5, ) ], if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")], else_steps=[register_model()], )

Defina um pipeline usando a DelayedReturn saída das etapas

Você define um pipeline da mesma forma, independentemente de usar ou não um @step decorador. Ao passar uma DelayedReturn instância para seu pipeline, você não precisa passar uma lista completa de etapas para criar o pipeline. O SDK infere automaticamente as etapas anteriores com base nas dependências que você define. Todas as etapas anteriores dos Step objetos que você passou para o pipeline ou DelayedReturn objetos estão incluídas no gráfico do pipeline. No exemplo a seguir, o pipeline recebe o DelayedReturn objeto da train função. SageMaker adiciona a preprocess etapa, como uma etapa anterior dotrain, ao gráfico do pipeline.

from sagemaker.workflow.pipeline import Pipeline pipeline = Pipeline( name="<pipeline-name>", steps=[step_train_result], sagemaker_session=<sagemaker-session>, )

Se não houver dados ou dependências personalizadas entre as etapas e você executar várias etapas em paralelo, o gráfico do pipeline terá mais de um nó foliar. Passe todos esses nós de folha em uma lista para o steps argumento em sua definição de pipeline, conforme mostrado no exemplo a seguir:

@step def process1(): ... return data @step def process2(): ... return data step_process1_result = process1() step_process2_result = process2() pipeline = Pipeline( name="<pipeline-name>", steps=[step_process1_result, step_process2_result], sagemaker_session=sagemaker-session, )

Quando a tubulação é executada, as duas etapas são executadas paralelamente.

Você só passa os nós da folha do gráfico para o pipeline porque os nós da folha contêm informações sobre todas as etapas anteriores definidas por meio de dados ou dependências personalizadas. Ao compilar o pipeline, SageMaker também infere todas as etapas subsequentes que formam o gráfico do pipeline e adiciona cada uma delas como uma etapa separada ao pipeline.

Criar um pipeline

Crie um pipeline chamandopipeline.create(), conforme mostrado no trecho a seguir. Para obter detalhescreate(), consulte SageMaker.Workflow.Pipeline.Pipeline.create.

role = "pipeline-role" pipeline.create(role)

Quando você chamapipeline.create(), SageMaker compila todas as etapas definidas como parte da instância do pipeline. SageMaker carrega a função serializada, os argumentos e todos os outros artefatos relacionados à etapa para o Amazon S3.

Os dados residem no bucket do S3 de acordo com a seguinte estrutura:

s3_root_uri/ pipeline_name/ sm_rf_user_ws/ workspace.zip # archive of the current working directory (workdir) step_name/ timestamp/ arguments/ # serialized function arguments function/ # serialized function pre_train_dependencies/ # any dependencies and pre_execution scripts provided for the step execution_id/ step_name/ results # returned output from the serialized function including the model

s3_root_urié definido no arquivo de SageMaker configuração e se aplica a todo o pipeline. Se indefinido, o SageMaker bucket padrão será usado.

nota

Sempre que SageMaker compila um pipeline, SageMaker salva as funções, argumentos e dependências serializados das etapas em uma pasta com a data e hora atual. Isso ocorre toda vez que você correpipeline.create(),pipeline.update(), pipeline.upsert() oupipeline.definition().