运行管道 - Amazon SageMaker

运行管道

使用 SageMaker Python SDK 创建管道定义后,您可以将其提交给 SageMaker 以启动执行。以下教程展示了如何提交管道、开始执行、检查执行结果以及删除管道。

先决条件

本教程要求以下项目:

  • SageMaker 笔记本实例。 

  • SageMaker Pipelines 管道定义。本教程假设您使用的是完成定义管道教程后创建的管道定义。

步骤 1:启动管道

首先,您需要启动管道。

启动管道
  1. 检查 JSON 管道定义以确保其格式正确。

    import json json.loads(pipeline.definition())
  2. 将管道定义提交给 SageMaker Pipelines 服务以创建管道(如果不存在),或更新管道(如果存在)。SageMaker Pipelines 使用传入的角色来创建步骤中定义的所有作业。

    pipeline.upsert(role_arn=role)
  3. 启动管道执行。

    execution = pipeline.start()

步骤 2:检查管道执行

接下来,您需要检查管道执行。

检查管道执行
  1. 描述管道执行状态,确保其已成功创建并启动。

    execution.describe()
  2. 等待执行完成。

    execution.wait()
  3. 列出执行步骤及其状态。

    execution.list_steps()

    您的输出应与以下内容类似:

    [{'StepName': 'AbaloneTransform', 'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 27, 870000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 11, 21, 2, 45, 50, 492000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': ''}, 'Metadata': {'TransformJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:transform-job/pipelines-cfvy1tjuxdq8-abalonetransform-ptyjoef3jy'}}}, {'StepName': 'AbaloneRegisterModel', 'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 26, 929000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 28, 15000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': ''}, 'Metadata': {'RegisterModel': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:model-package/abalonemodelpackagegroupname/1'}}}, {'StepName': 'AbaloneCreateModel', 'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 26, 895000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 27, 708000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': ''}, 'Metadata': {'Model': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:model/pipelines-cfvy1tjuxdq8-abalonecreatemodel-jl94rai0ra'}}}, {'StepName': 'AbaloneMSECond', 'StartTime': datetime.datetime(2020, 11, 21, 2, 41, 25, 558000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 26, 329000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': ''}, 'Metadata': {'Condition': {'Outcome': 'True'}}}, {'StepName': 'AbaloneEval', 'StartTime': datetime.datetime(2020, 11, 21, 2, 37, 34, 767000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 11, 21, 2, 41, 18, 80000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': ''}, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:processing-job/pipelines-cfvy1tjuxdq8-abaloneeval-zfraozhmny'}}}, {'StepName': 'AbaloneTrain', 'StartTime': datetime.datetime(2020, 11, 21, 2, 34, 55, 867000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 11, 21, 2, 37, 34, 34000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': ''}, 'Metadata': {'TrainingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:training-job/pipelines-cfvy1tjuxdq8-abalonetrain-tavd6f3wdf'}}}, {'StepName': 'AbaloneProcess', 'StartTime': datetime.datetime(2020, 11, 21, 2, 30, 27, 160000, tzinfo=tzlocal()), 'EndTime': datetime.datetime(2020, 11, 21, 2, 34, 48, 390000, tzinfo=tzlocal()), 'StepStatus': 'Succeeded', 'CacheHitResult': {'SourcePipelineExecutionArn': ''}, 'Metadata': {'ProcessingJob': {'Arn': 'arn:aws:sagemaker:us-east-2:111122223333:processing-job/pipelines-cfvy1tjuxdq8-abaloneprocess-mgqyfdujcj'}}}]
  4. 管道执行完成后,从 Amazon S3 下载生成的 evaluation.json 文件以检查报告。

    evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] )) json.loads(evaluation_json)

步骤 3:覆盖管道执行的默认参数

您可以通过指定不同的管道参数来覆盖默认值,从而运行管道的其他执行。

覆盖默认参数
  1. 创建管道执行。这将启动另一个管道执行,并将模型批准状态覆盖设置为“已批准”。这意味着 RegisterModel 步骤生成的模型包版本已自动准备好通过 CI/CD 管道(例如使用 SageMaker 项目)进行部署。有关更多信息,请参阅 使用 SageMaker 项目自动执行 MLOps

    execution = pipeline.start( parameters=dict( ModelApprovalStatus="Approved", ) )
  2. 等待执行完成。

    execution.wait()
  3. 列出执行步骤及其状态。

    execution.list_steps()
  4. 管道执行完成后,从 Amazon S3 下载生成的 evaluation.json 文件以检查报告。

    evaluation_json = sagemaker.s3.S3Downloader.read_file("{}/evaluation.json".format( step_eval.arguments["ProcessingOutputConfig"]["Outputs"][0]["S3Output"]["S3Uri"] )) json.loads(evaluation_json)

步骤 4:停止并删除管道执行

完成管道后,您可以停止任何正在进行的执行并删除管道。

停止并删除管道执行
  1. 停止管道执行。

    execution.stop()
  2. 删除管道。

    pipeline.delete()