

# Pipelines overview
<a name="pipelines-overview"></a>

An Amazon SageMaker AI pipeline is a series of interconnected steps in directed acyclic graph (DAG) that are defined using the drag-and-drop UI or [Pipelines SDK](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html). You can also build your pipeline using the [pipeline definition JSON schema](https://aws-sagemaker-mlops.github.io/sagemaker-model-building-pipeline-definition-JSON-schema/). This DAG JSON definition gives information on the requirements and relationships between each step of your pipeline. The structure of a pipeline's DAG is determined by the data dependencies between steps. These data dependencies are created when the properties of a step's output are passed as the input to another step. The following image is an example of a pipeline DAG:

![\[\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipeline-full.png)


**The example DAG includes the following steps:**

1. `AbaloneProcess`, an instance of the [Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) step, runs a preprocessing script on the data used for training. For example, the script could fill in missing values, normalize numerical data, or split data into the train, validation, and test datasets.

1. `AbaloneTrain`, an instance of the [Training](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-training) step, configures hyperparameters and trains a model from the preprocessed input data.

1. `AbaloneEval`, another instance of the [Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-processing) step, evaluates the model for accuracy. This step shows an example of a data dependency—this step uses the test dataset output of the `AbaloneProcess`.

1. `AbaloneMSECond` is an instance of a [Condition](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-condition) step which, in this example, checks to make sure the mean-square-error result of model evaluation is below a certain limit. If the model does not meet the criteria, the pipeline run stops.

1. The pipeline run proceeds with the following steps:

   1. `AbaloneRegisterModel`, where SageMaker AI calls a [RegisterModel](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-register-model) step to register the model as a versioned model package group into the Amazon SageMaker Model Registry.

   1. `AbaloneCreateModel`, where SageMaker AI calls a [CreateModel](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-create-model) step to create the model in preparation for batch transform. In `AbaloneTransform`, SageMaker AI calls a [Transform](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-transform) step to generate model predictions on a dataset you specify.

The following topics describe fundamental Pipelines concepts. For a tutorial describing the implementation of these concepts, see [Pipelines actions](pipelines-build.md).

**Topics**
+ [Pipeline Structure and Execution](build-and-manage-pipeline.md)
+ [IAM Access Management](build-and-manage-access.md)
+ [Set up cross-account support for Pipelines](build-and-manage-xaccount.md)
+ [Pipeline parameters](build-and-manage-parameters.md)
+ [Pipelines steps](build-and-manage-steps.md)
+ [Lift-and-shift Python code with the @step decorator](pipelines-step-decorator.md)
+ [Pass Data Between Steps](build-and-manage-propertyfile.md)
+ [Caching pipeline steps](pipelines-caching.md)
+ [Retry Policy for Pipeline Steps](pipelines-retry-policy.md)
+ [Selective execution of pipeline steps](pipelines-selective-ex.md)
+ [Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines](pipelines-quality-clarify-baseline-lifecycle.md)
+ [Schedule Pipeline Runs](pipeline-eventbridge.md)
+ [Amazon SageMaker Experiments Integration](pipelines-experiments.md)
+ [Run pipelines using local mode](pipelines-local-mode.md)
+ [Troubleshooting Amazon SageMaker Pipelines](pipelines-troubleshooting.md)

# Pipeline Structure and Execution
<a name="build-and-manage-pipeline"></a>

**Topics**
+ [Pipeline Structure](#build-and-manage-pipeline-structure)
+ [Pipeline Execution using Parallelism Configuration](#build-and-manage-pipeline-execution)

## Pipeline Structure
<a name="build-and-manage-pipeline-structure"></a>

An Amazon SageMaker Pipelines instance is composed of a `name`, `parameters`, and `steps`. Pipeline names must be unique within an `(account, region)` pair. All parameters used in step definitions must be defined in the pipeline. Pipeline steps listed automatically determine their order of execution by their data dependencies on one another. The Pipelines service resolves the relationships between steps in the data dependency DAG to create a series of steps that the execution completes. The following is an example of a pipeline structure.

**Warning**  
When building a pipeline through the visual editor or SageMaker AI Python SDK, do not include sensitive information in pipeline parameters or any step definition field (such as environment variables). These fields will be visible in the future when returned in a `DescribePipeline` request.

```
from sagemaker.workflow.pipeline import Pipeline
  
  pipeline_name = f"AbalonePipeline"
  pipeline = Pipeline(
      name=pipeline_name,
      parameters=[
          processing_instance_type, 
          processing_instance_count,
          training_instance_type,
          model_approval_status,
          input_data,
          batch_data,
      ],
      steps=[step_process, step_train, step_eval, step_cond],
  )
```

## Pipeline Execution using Parallelism Configuration
<a name="build-and-manage-pipeline-execution"></a>

By default, a pipeline performs all steps that are available to run in parallel. You can control this behavior by using the `ParallelismConfiguration` property when creating or updating a pipeline, as well as when starting or retrying a pipeline execution. 

Parallelism configurations are applied per execution. For example, if two executions are started they can each run a maximum of 50 steps concurrently, for a total of 100 concurrently running steps. Also, `ParallelismConfiguration`(s) specified when starting, retrying or updating an execution take precedence over parallelism configurations defined in the pipeline.

**Example Creating a pipeline execution with `ParallelismConfiguration`**  

```
pipeline = Pipeline(
        name="myPipeline",
        steps=[step_process, step_train]
    )

  pipeline.create(role, parallelism_config={"MaxParallelExecutionSteps": 50})
```

# IAM Access Management
<a name="build-and-manage-access"></a>

The following sections describe the AWS Identity and Access Management (IAM) requirements for Amazon SageMaker Pipelines. For an example of how you can implement these permissions, see [Prerequisites](define-pipeline.md#define-pipeline-prereq).

**Topics**
+ [Pipeline Role Permissions](#build-and-manage-role-permissions)
+ [Pipeline Step Permissions](#build-and-manage-step-permissions)
+ [CORS configuration with Amazon S3 buckets](#build-and-manage-cors-s3)
+ [Customize access management for Pipelines jobs](#build-and-manage-step-permissions-prefix)
+ [Customize access to pipeline versions](#build-and-manage-step-permissions-version)
+ [Service Control Policies with Pipelines](#build-and-manage-scp)

## Pipeline Role Permissions
<a name="build-and-manage-role-permissions"></a>

Your pipeline requires an IAM pipeline execution role that is passed to Pipelines when you create a pipeline. The role for the SageMaker AI instance you're using to create the pipeline must have a policy with the `iam:PassRole` permission that specifies the pipeline execution role. This is because the instance needs permission to pass your pipeline execution role to the Pipelines service for use in creating and running pipelines. For more information on IAM roles, see [IAM Roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html).

Your pipeline execution role requires the following permissions:
+ You can use a unique or customized role for any of the SageMaker AI job steps in your pipeline (rather than the pipeline execution role, which is used by default). Make sure that your pipeline execution role has added a policy with the `iam:PassRole` permission that specifies each of these roles.
+  `Create` and `Describe` permissions for each of the job types in the pipeline. 
+  Amazon S3 permissions to use the `JsonGet` function. You control access to your Amazon S3 resources using resource-based policies and identity-based policies. A resource-based policy is applied to your Amazon S3 bucket and grants Pipelines access to the bucket. An identity-based policy gives your pipeline the ability to make Amazon S3 calls from your account. For more information on resource-based policies and identity-based policies, see [Identity-based policies and resource-based policies](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html). 

  ```
  {
      "Action": [
          "s3:GetObject"
      ],
      "Resource": "arn:aws:s3:::<your-bucket-name>/*",
      "Effect": "Allow"
  }
  ```

## Pipeline Step Permissions
<a name="build-and-manage-step-permissions"></a>

Pipelines include steps that run SageMaker AI jobs. In order for the pipeline steps to run these jobs, they require an IAM role in your account that provides access for the needed resource. This role is passed to the SageMaker AI service principal by your pipeline. For more information on IAM roles, see [IAM Roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles.html). 

By default, each step takes on the pipeline execution role. You can optionally pass a different role to any of the steps in your pipeline. This ensures that the code in each step does not have the ability to impact resources used in other steps unless there is a direct relationship between the two steps specified in the pipeline definition. You pass these roles when defining the processor or estimator for your step. For examples of how to include these roles in these definitions, see the [SageMaker AI Python SDK documentation](https://sagemaker.readthedocs.io/en/stable/overview.html#using-estimators). 

## CORS configuration with Amazon S3 buckets
<a name="build-and-manage-cors-s3"></a>

To ensure your images are imported into your Pipelines from an Amazon S3 bucket in a predictable manner, a CORS configuration must be added to Amazon S3 buckets where images are imported from. This section provides instructions on how to set the required CORS configuration to your Amazon S3 bucket. The XML `CORSConfiguration` required for Pipelines differs from the one in [CORS Requirement for Input Image Data](sms-cors-update.md), otherwise you can use the information there to learn more about the CORS requirement with Amazon S3 buckets.

Use the following CORS configuration code for the Amazon S3 buckets that host your images. For instructions on configuring CORS, see [Configuring cross-origin resource sharing (CORS)](https://docs.aws.amazon.com/AmazonS3/latest/user-guide/add-cors-configuration.html) in the Amazon Simple Storage Service User Guide. If you use the Amazon S3 console to add the policy to your bucket, you must use the JSON format.

**JSON**

```
[
    {
        "AllowedHeaders": [
            "*"
        ],
        "AllowedMethods": [
            "PUT"
        ],
        "AllowedOrigins": [
            "*"
        ],
        "ExposeHeaders": [
            "Access-Control-Allow-Origin"
        ]
    }
]
```

**XML**

```
<CORSConfiguration>
 <CORSRule>
   <AllowedHeader>*</AllowedHeader>
   <AllowedOrigin>*</AllowedOrigin>
   <AllowedMethod>PUT</AllowedMethod>
   <ExposeHeader>Access-Control-Allow-Origin</ExposeHeader>
 </CORSRule>
</CORSConfiguration>
```

The following GIF demonstrates the instructions found in the Amazon S3 documentation to add a CORS header policy using the Amazon S3 console.

![\[Gif on how to add a CORS header policy using the Amazon S3 console.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/sms/gifs/cors-config.gif)


## Customize access management for Pipelines jobs
<a name="build-and-manage-step-permissions-prefix"></a>

You can further customize your IAM policies so selected members in your organization can run any or all pipeline steps. For example, you can give certain users permission to create training jobs, and another group of users permission to create processing jobs, and all of your users permission to run the remaining steps. To use this feature, you select a custom string which prefixes your job name. Your admin prepends the permitted ARNs with the prefix while your data scientist includes this prefix in pipeline instantiations. Because the IAM policy for permitted users contains a job ARN with the specified prefix, subsequent jobs of your pipeline step have necessary permissions to proceed. Job prefixing is off by default—you must toggle on this option in your `Pipeline` class to use it. 

For jobs with prefixing turned off, the job name is formatted as shown and is a concatenation of fields described in the following table:

`pipelines-<executionId>-<stepNamePrefix>-<entityToken>-<failureCount>`


| Field | Definition | 
| --- | --- | 
|  pipelines   |  A static string always prepended. This string identifies the pipeline orchestration service as the job's source.  | 
|  executionId  |  A randomized buffer for the running instance of the pipeline.  | 
|  stepNamePrefix  |  The user-specified step name (given in the `name` argument of the pipeline step), limited to the first 20 characters.  | 
|  entityToken  |  A randomized token to ensure idempotency of the step entity.  | 
|  failureCount  |  The current number of retries attempted to complete the job.  | 

In this case, no custom prefix is prepended to the job name, and the corresponding IAM policy must match this string.

For users who turn on job prefixing, the underlying job name takes the following form, with the custom prefix specified as `MyBaseJobName`:

*<MyBaseJobName>*-*<executionId>*-*<entityToken>*-*<failureCount>*

The custom prefix replaces the static `pipelines` string to help you narrow the selection of users who can run the SageMaker AI job as a part of a pipeline.

**Prefix length restrictions**

The job names have internal length constraints specific to individual pipeline steps. This constraint also limits the length of the allowed prefix. The prefix length requirements are as follows:


| Pipeline step | Prefix length | 
| --- | --- | 
|   `[TrainingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#trainingstep)`, `[ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`, `[TransformStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#transformstep)`, `[ProcessingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#processingstep)`, `[ClarifyCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#clarifycheckstep)`, `[QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#qualitycheckstep)`, `[RegisterModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`   |  38  | 
|  `[TuningStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#tuningstep)`, `[AutoML](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#automlstep)`  |  6  | 

### Apply job prefixes to an IAM policy
<a name="build-and-manage-step-permissions-prefix-iam"></a>

Your admin creates IAM policies allowing users of specific prefixes to create jobs. The following example policy permits data scientists to create training jobs if they use the `MyBaseJobName` prefix. 

```
{
    "Action": "sagemaker:CreateTrainingJob",
    "Effect": "Allow",
    "Resource": [
        "arn:aws:sagemaker:region:account-id:*/MyBaseJobName-*"
    ]
}
```

### Apply job prefixes to pipeline instantiations
<a name="build-and-manage-step-permissions-prefix-inst"></a>

You specify your prefix with the `*base_job_name` argument of the job instance class.

**Note**  
You pass your job prefix with the `*base_job_name` argument to the job instance before creating a pipeline step. This job instance contains the necessary information for the job to run as a step in a pipeline. This argument varies depending upon the job instance used. The following list shows which argument to use for each pipeline step type:  
`base_job_name` for the `[Estimator](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html)` (`[TrainingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#trainingstep)`), `[Processor](https://sagemaker.readthedocs.io/en/stable/api/training/processing.html)` (`[ProcessingStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#processingstep)`), and `[AutoML](https://sagemaker.readthedocs.io/en/stable/api/training/automl.html)` (`[AutoMLStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#automlstep)`) classes
`tuning_base_job_name` for the `[Tuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html)` class (`[TuningStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#tuningstep)`)
`transform_base_job_name` for the `[Transformer](https://sagemaker.readthedocs.io/en/stable/api/inference/transformer.html)` class (`[TransformStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#transformstep)`)
`base_job_name` of `[CheckJobConfig](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#checkjobconfig)` for the `[QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#qualitycheckstep)` (Quality Check) and `[ClarifyCheckstep](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#clarifycheckstep)` (Clarify Check) classes
For the `[Model](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html)` class, the argument used depends on if you run `create` or `register` on your model before passing the result to `[ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#step-collections)`  
If you call `create`, the custom prefix comes from the `name` argument when you construct your model (i.e., `Model(name=)`)
If you call `register`, the custom prefix comes from the `model_package_name` argument of your call to `register` (i.e., `my_model.register(model_package_name=)`)

The following example shows how to specify a prefix for a new training job instance.

```
# Create a job instance
xgb_train = Estimator(
    image_uri=image_uri,
    instance_type="ml.m5.xlarge",
    instance_count=1,
    output_path=model_path,
    role=role,
    subnets=["subnet-0ab12c34567de89f0"],
    base_job_name="MyBaseJobName"
    security_group_ids=["sg-1a2bbcc3bd4444e55"],
    tags = [ ... ]
    encrypt_inter_container_traffic=True, 
)

# Attach your job instance to a pipeline step
step_train = TrainingStep(
    name="TestTrainingJob",
    estimator=xgb_train, 
    inputs={
        "train": TrainingInput(...), 
        "validation": TrainingInput(...) 
    }
)
```

Job prefixing is off by default. To opt into this feature, use the `use_custom_job_prefix` option of `PipelineDefinitionConfig` as shown in the following snippet:

```
from sagemaker.workflow.pipeline_definition_config import PipelineDefinitionConfig
        
# Create a definition configuration and toggle on custom prefixing
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True);

# Create a pipeline with a custom prefix
 pipeline = Pipeline(
     name="MyJobPrefixedPipeline",
     parameters=[...]
     steps=[...]
     pipeline_definition_config=definition_config
)
```

Create and run your pipeline. The following example creates and runs a pipeline, and also demonstrates how you can turn off job prefixing and rerun your pipeline.

```
pipeline.create(role_arn=sagemaker.get_execution_role())

# Optionally, call definition() to confirm your prefixed job names are in the built JSON
pipeline.definition()
pipeline.start()
      
# To run a pipeline without custom-prefixes, toggle off use_custom_job_prefix, update the pipeline 
# via upsert() or update(), and start a new run
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=False)
pipeline.pipeline_definition_config = definition_config
pipeline.update()
execution = pipeline.start()
```

Similarly, you can toggle the feature on for existing pipelines and start a new run which uses job prefixes.

```
definition_config = PipelineDefinitionConfig(use_custom_job_prefix=True)
pipeline.pipeline_definition_config = definition_config
pipeline.update()
execution = pipeline.start()
```

Finally, you can view your custom-prefixed job by calling `list_steps` on the pipeline execution.

```
steps = execution.list_steps()

prefixed_training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']
```

## Customize access to pipeline versions
<a name="build-and-manage-step-permissions-version"></a>

You can grant customized access to specific versions of Amazon SageMaker Pipelines by using the `sagemaker:PipelineVersionId` condition key. For example, the policy below grants access to start executions or update pipeline version only for version ID 6 and above.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": {
        "Sid": "AllowStartPipelineExecution",
        "Effect": "Allow",
        "Action": [
            "sagemaker:StartPipelineExecution",
            "sagemaker:UpdatePipelineVersion"
        ],
        "Resource": "*",
        "Condition": {
            "NumericGreaterThanEquals": {
                "sagemaker:PipelineVersionId": 6
            }
        }
    }
}
```

------

For more information about supported condition keys, see [Condition keys for Amazon SageMaker AI](https://docs.aws.amazon.com//service-authorization/latest/reference/list_amazonsagemaker.html#amazonsagemaker-policy-keys).

## Service Control Policies with Pipelines
<a name="build-and-manage-scp"></a>

Service control policies (SCPs) are a type of organization policy that you can use to manage permissions in your organization. SCPs offer central control over the maximum available permissions for all accounts in your organization. By using Pipelines within your organization, you can ensure that data scientists manage your pipeline executions without having to interact with the AWS console. 

If you're using a VPC with your SCP that restricts access to Amazon S3, you need to take steps to allow your pipeline to access other Amazon S3 resources. 

To allow Pipelines to access Amazon S3 outside of your VPC with the `JsonGet` function, update your organization's SCP to ensure that the role using Pipelines can access Amazon S3. To do this, create an exception for roles that are being used by the Pipelines executor via the pipeline execution role using a principal tag and condition key. 

**To allow Pipelines to access Amazon S3 outside of your VPC**

1. Create a unique tag for your pipeline execution role following the steps in [Tagging IAM users and roles](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_tags.html). 

1. Grant an exception in your SCP using the `Aws:PrincipalTag IAM` condition key for the tag you created. For more information, see [Creating, updating, and deleting service control policies](https://docs.aws.amazon.com/organizations/latest/userguide/orgs_manage_policies_scps_create.html). 

# Set up cross-account support for Pipelines
<a name="build-and-manage-xaccount"></a>

Cross-account support for Amazon SageMaker Pipelines enables you to collaborate on machine learning pipelines with other teams or organizations that operate in different AWS accounts. By setting up cross-account pipeline sharing, you can grant controlled access to pipelines, allow other accounts to view pipeline details, trigger executions, and monitor runs. The following topic covers how to set up cross-account pipeline sharing, the different permission policies available for shared resources, and how to access and interact with shared pipeline entities through direct API calls to SageMaker AI.

## Set up cross-account pipeline sharing
<a name="build-and-manage-xaccount-set-up"></a>

SageMaker AI uses [AWS Resource Access Manager](https://docs.aws.amazon.com/ram/latest/userguide/what-is.html) (AWS RAM) to help you securely share your pipeline entities across accounts. 

### Create a resource share
<a name="build-and-manage-xaccount-set-up-console"></a>

1. Select **Create a resource share** through the [AWS RAM console](https://console.aws.amazon.com/ram/home).

1. When specifying resource share details, choose the Pipelines resource type and select one or more pipelines that you want to share. When you share a pipeline with any other account, all of its executions are also shared implicitly.

1. Associate permissions with your resource share. Choose either the default read-only permission policy or the extended pipeline execution permission policy. For more detailed information, see [Permission policies for Pipelines resources](#build-and-manage-xaccount-permissions). 
**Note**  
If you select the extended pipeline execution policy, note that any start, stop, and retry commands called by shared accounts use resources in the AWS account that shared the pipeline.

1. Use AWS account IDs to specify the accounts to which you want to grant access to your shared resources.

1. Review your resource share configuration and select **Create resource share**. It may take a few minutes for the resource share and principal associations to complete.

For more information, see [Sharing your AWS resources](https://docs.aws.amazon.com/ram/latest/userguide/getting-started-sharing.html) in the *AWS Resource Access Manager User Guide*.

### Get responses to your resource share invitation
<a name="build-and-manage-xaccount-set-up-responses"></a>

Once the resource share and principal associations are set, the specified AWS accounts receive an invitation to join the resource share. The AWS accounts must accept the invite to gain access to any shared resources.

For more information on accepting a resource share invite through AWS RAM, see [Using shared AWS resources ](https://docs.aws.amazon.com/ram/latest/userguide/getting-started-shared.html)in the *AWS Resource Access Manager User Guide*.

## Permission policies for Pipelines resources
<a name="build-and-manage-xaccount-permissions"></a>

When creating your resource share, choose one of two supported permission policies to associate with the SageMaker AI pipeline resource type. Both policies grant access to any selected pipeline and all of its executions. 

### Default read-only permissions
<a name="build-and-manage-xaccount-permissions-default"></a>

The `AWSRAMDefaultPermissionSageMakerPipeline` policy allows the following read-only actions:

```
"sagemaker:DescribePipeline"
"sagemaker:DescribePipelineDefinitionForExecution"   
"sagemaker:DescribePipelineExecution"
"sagemaker:ListPipelineExecutions"
"sagemaker:ListPipelineExecutionSteps"
"sagemaker:ListPipelineParametersForExecution"
"sagemaker:Search"
```

### Extended pipeline execution permissions
<a name="build-and-manage-xaccount-permissions-extended"></a>

The `AWSRAMPermissionSageMakerPipelineAllowExecution` policy includes all of the read-only permissions from the default policy and also allows shared accounts to start, stop, and retry pipeline executions.

**Note**  
Be mindful of AWS resource usage when using the extended pipeline execution permission policy. With this policy, shared accounts are allowed to start, stop, and retry pipeline executions. Any resources used for shared pipeline executions are consumed by the owner account. 

The extended pipeline execution permission policy allows the following actions:

```
"sagemaker:DescribePipeline"
"sagemaker:DescribePipelineDefinitionForExecution"   
"sagemaker:DescribePipelineExecution"
"sagemaker:ListPipelineExecutions"
"sagemaker:ListPipelineExecutionSteps"
"sagemaker:ListPipelineParametersForExecution"
"sagemaker:StartPipelineExecution"
"sagemaker:StopPipelineExecution"
"sagemaker:RetryPipelineExecution"
"sagemaker:Search"
```

## Access shared pipeline entities through direct API calls
<a name="build-and-manage-xaccount-api-calls"></a>

Once cross-account pipeline sharing is set up, you can call the following SageMaker API actions using a pipeline ARN:

**Note**  
You can only call API commands if they are included in the permissions associated with your resource share. If you select the `AWSRAMPermissionSageMakerPipelineAllowExecution` policy, then the start, stop, and retry commands use resources in the AWS account that shared the pipeline.
+ [DescribePipeline](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipeline.html)
+ [DescribePipelineDefinitionForExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineDefinitionForExecution.html)
+ [DescribePipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribePipelineExecution.html)
+ [ListPipelineExecutions](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutions.html)
+ [ListPipelineExecutionSteps](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineExecutionSteps.html)
+ [ListPipelineParametersForExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_ListPipelineParametersForExecution.html)
+ [StartPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StartPipelineExecution.html)
+ [StopPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StopPipelineExecution.html)
+ [RetryPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_RetryPipelineExecution.html)

# Pipeline parameters
<a name="build-and-manage-parameters"></a>

You can introduce variables into your pipeline definition using parameters. You can reference parameters that you define throughout your pipeline definition. Parameters have a default value, which you can override by specifying parameter values when starting a pipeline execution. The default value must be an instance matching the parameter type. All parameters used in step definitions must be defined in your pipeline definition. This topic describes the parameters that you can define and how to implement them.

Amazon SageMaker Pipelines supports the following parameter types: 
+  `ParameterString` – Representing a string parameter. 
+  `ParameterInteger` – Representing an integer parameter. 
+  `ParameterFloat` – Representing a float parameter.
+  `ParameterBoolean` – Representing a Boolean Python type.

Parameters take the following format:

```
<parameter> = <parameter_type>(
    name="<parameter_name>",
    default_value=<default_value>
)
```

The following example shows a sample parameter implementation.

```
from sagemaker.workflow.parameters import (
    ParameterInteger,
    ParameterString,
    ParameterFloat,
    ParameterBoolean
)

processing_instance_count = ParameterInteger(
    name="ProcessingInstanceCount",
    default_value=1
)
```

You pass the parameter when creating your pipeline as shown in the following example.

```
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[
        processing_instance_count
    ],
    steps=[step_process]
)
```

You can also pass a parameter value that differs from the default value to a pipeline execution, as shown in the following example.

```
execution = pipeline.start(
    parameters=dict(
        ProcessingInstanceCount="2",
        ModelApprovalStatus="Approved"
    )
)
```

You can manipulate parameters with SageMaker Python SDK functions like `[ sagemaker.workflow.functions.Join](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.functions.Join)`. For more information on parameters, see [ SageMaker Pipelines Parameters](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parameters).

For known limitations of Pipelines Parameters, see *[Limitations - Parameterization](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#parameterization)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

# Pipelines steps
<a name="build-and-manage-steps"></a>

Pipelines are composed of steps. These steps define the actions that the pipeline takes and the relationships between steps using properties. The following page describes the types of steps, their properties, and the relationships between them.

**Topics**
+ [Add a step](build-and-manage-steps-types.md)
+ [Add integration](build-and-manage-steps-integration.md)
+ [Step properties](#build-and-manage-properties)
+ [Step parallelism](#build-and-manage-parallelism)
+ [Data dependency between steps](#build-and-manage-data-dependency)
+ [Custom dependency between steps](#build-and-manage-custom-dependency)
+ [Custom images in a step](#build-and-manage-images)

# Add a step
<a name="build-and-manage-steps-types"></a>

The following describes the requirements of each step type and provides an example implementation of the step, as well as how to add the step to a Pipelines. These are not working implementations because they don't provide the resource and inputs needed. For a tutorial that implements these steps, see [Pipelines actions](pipelines-build.md).

**Note**  
You can also create a step from your local machine learning code by converting it to a Pipelines step with the `@step` decorator. For more information, see [@step decorator](#step-type-custom).

Amazon SageMaker Pipelines support the following step types:
+ [Execute code](#step-type-executecode)

  [Processing](#step-type-processing)
+ [Training](#step-type-training)
+ [Tuning](#step-type-tuning)
+ [AutoML](#step-type-automl)
+ [`Model`](#step-type-model)
+ [`Create model`](#step-type-create-model)
+ [`Register model`](#step-type-register-model)
+ [`Deploy model (endpoint)`](#step-type-deploy-model-endpoint)
+ [Transform](#step-type-transform)
+ [Condition](#step-type-condition)
+ [`Callback`](#step-type-callback)
+ [Lambda](#step-type-lambda)
+ [`ClarifyCheck`](#step-type-clarify-check)
+ [`QualityCheck`](#step-type-quality-check)
+ [EMR](#step-type-emr)
+ [Notebook Job](#step-type-notebook-job)
+ [Fail](#step-type-fail)

## @step decorator
<a name="step-type-custom"></a>

If you want to orchestrate a custom ML job that leverages advanced SageMaker AI features or other AWS services in the drag-and-drop Pipelines UI, use the [Execute code step](#step-type-executecode).

You can create a step from local machine learning code using the `@step` decorator. After you test your code, you can convert the function to a SageMaker AI pipeline step by annotating it with the `@step` decorator. Pipelines creates and runs a pipeline when you pass the output of the `@step`-decorated function as a step to your pipeline. You can also create a multi-step DAG pipeline that includes one or more `@step`-decorated functions as well as traditional SageMaker AI pipeline steps. For more details about how to create a step with `@step` decorator, see [Lift-and-shift Python code with the @step decorator](pipelines-step-decorator.md).

## Execute code step
<a name="step-type-executecode"></a>

In the Pipelines drag-and-drop UI, you can use an **Execute code** step to run your own code as a pipeline step. You can upload a Python function, script, or notebook to be executed as part of your pipeline. You should use this step if you want to orchestrate a custom ML job that leverages advanced SageMaker AI features or other AWS services.

The **Execute Code** step uploads files to your default Amazon S3 bucket for Amazon SageMaker AI. This bucket might not have the required Cross-Origin Resource Sharing (CORS) permissions set. To learn more about configuring CORS permissions, see [CORS Requirement for Input Image Data](sms-cors-update.md).

The **Execute Code** step uses an Amazon SageMaker training job to run your code. Ensure that your IAM role has the `sagemaker:DescribeTrainingJob` and `sagemaker:CreateTrainingJob` API permissions. To learn more about all the required permissions for Amazon SageMaker AI and how to set them up, see [Amazon SageMaker AI API Permissions: Actions, Permissions, and Resources Reference](api-permissions-reference.md).

To add an execute code step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Execute code** and drag it to the canvas.

1. In the canvas, choose the **Execute code** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs.

1. You can upload a single file to execute or upload a compressed folder containing multiple artifacts.

1. For single file uploads, you can provide optional parameters for notebooks, python functions, or scripts.

1. When providing Python functions, a handler must be provided in the format `file.py:<function_name>`

1. For compressed folder uploads, relative paths to your code must be provided, and you can optionally provide paths to a `requirements.txt` file or initialization script inside the compressed folder.

1. If the canvas includes any step that immediately precedes the **Execute code** step you added, click and drag the cursor from the step to the **Execute code** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Execute code** step you added, click and drag the cursor from the **Execute code** step to the step to create an edge. Outputs from **Execute code** steps can be referenced for Python functions.

## Processing step
<a name="step-type-processing"></a>

Use a processing step to create a processing job for data processing. For more information on processing jobs, see [Process Data and Evaluate Models](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html).

------
#### [ Pipeline Designer ]

To add a processing step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. In the left sidebar, choose **Process data** and drag it to the canvas.

1. In the canvas, choose the **Process data** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep).

1. If the canvas includes any step that immediately precedes the **Process data** step you added, click and drag the cursor from the step to the **Process data** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Process data** step you added, click and drag the cursor from the **Process data** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

A processing step requires a processor, a Python script that defines the processing code, outputs for processing, and job arguments. The following example shows how to create a `ProcessingStep` definition. 

```
from sagemaker.sklearn.processing import SKLearnProcessor

sklearn_processor = SKLearnProcessor(framework_version='1.0-1',
                                     role=<role>,
                                     instance_type='ml.m5.xlarge',
                                     instance_count=1)
```

```
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

inputs = [
    ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),
]

outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

step_process = ProcessingStep(
    name="AbaloneProcess",
    step_args = sklearn_processor.run(inputs=inputs, outputs=outputs,
        code="abalone/preprocessing.py")
)
```

**Pass runtime parameters**

The following example shows how to pass runtime parameters from a PySpark processor to a `ProcessingStep`.

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

pipeline_session = PipelineSession()

pyspark_processor = PySparkProcessor(
    framework_version='2.4',
    role=<role>,
    instance_type='ml.m5.xlarge',
    instance_count=1,
    sagemaker_session=pipeline_session,
)

step_args = pyspark_processor.run(
    inputs=[ProcessingInput(source=<input_data>, destination="/opt/ml/processing/input"),],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
    ],
    code="preprocess.py",
    arguments=None,
)


step_process = ProcessingStep(
    name="AbaloneProcess",
    step_args=step_args,
)
```

For more information on processing step requirements, see the [sagemaker.workflow.steps.ProcessingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.ProcessingStep) documentation. For an in-depth example, see the [Orchestrate Jobs to Train and Evaluate Models with Amazon SageMaker Pipelines](https://github.com/aws/amazon-sagemaker-examples/blob/62de6a1fca74c7e70089d77e36f1356033adbe5f/sagemaker-pipelines/tabular/abalone_build_train_deploy/sagemaker-pipelines-preprocess-train-evaluate-batch-transform.ipynb) example notebook. The *Define a Processing Step for Feature Engineering* section includes more information.

------

## Training step
<a name="step-type-training"></a>

You use a training step to create a training job to train a model. For more information on training jobs, see [Train a Model with Amazon SageMaker AI](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html).

A training step requires an estimator, as well as training and validation data inputs.

------
#### [ Pipeline Designer ]

To add a training step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Train model** and drag it to the canvas.

1. In the canvas, choose the **Train model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep).

1. If the canvas includes any step that immediately precedes the **Train model** step you added, click and drag the cursor from the step to the **Train model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Train model** step you added, click and drag the cursor from the **Train model** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

The following example shows how to create a `TrainingStep` definition. For more information about training step requirements, see the [sagemaker.workflow.steps.TrainingStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TrainingStep) documentation.

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TrainingStep

from sagemaker.xgboost.estimator import XGBoost

pipeline_session = PipelineSession()

xgb_estimator = XGBoost(..., sagemaker_session=pipeline_session)

step_args = xgb_estimator.fit(
    inputs={
        "train": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "train"
            ].S3Output.S3Uri,
            content_type="text/csv"
        ),
        "validation": TrainingInput(
            s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
                "validation"
            ].S3Output.S3Uri,
            content_type="text/csv"
        )
    }
)

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=step_args,
)
```

------

## Tuning step
<a name="step-type-tuning"></a>

You use a tuning step to create a hyperparameter tuning job, also known as hyperparameter optimization (HPO). A hyperparameter tuning job runs multiple training jobs, with each job producing a model version. For more information on hyperparameter tuning, see [Automatic model tuning with SageMaker AI](automatic-model-tuning.md).

The tuning job is associated with the SageMaker AI experiment for the pipeline, with the training jobs created as trials. For more information, see [Experiments Integration](pipelines-experiments.md).

A tuning step requires a [HyperparameterTuner](https://sagemaker.readthedocs.io/en/stable/api/training/tuner.html) and training inputs. You can retrain previous tuning jobs by specifying the `warm_start_config` parameter of the `HyperparameterTuner`. For more information on hyperparameter tuning and warm start, see [Run a Warm Start Hyperparameter Tuning Job](automatic-model-tuning-warm-start.md).

You use the [get\$1top\$1model\$1s3\$1uri](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep.get_top_model_s3_uri) method of the [sagemaker.workflow.steps.TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep) class to get the model artifact from one of the top-performing model versions. For a notebook that shows how to use a tuning step in a SageMaker AI pipeline, see [sagemaker-pipelines-tuning-step.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/tuning-step/sagemaker-pipelines-tuning-step.ipynb).

**Important**  
Tuning steps were introduced in Amazon SageMaker Python SDK v2.48.0 and Amazon SageMaker Studio Classic v3.8.0. You must update Studio Classic before you use a tuning step or the pipeline DAG doesn't display. To update Studio Classic, see [Shut Down and Update Amazon SageMaker Studio Classic](studio-tasks-update-studio.md).

The following example shows how to create a `TuningStep` definition.

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.tuner import HyperparameterTuner
from sagemaker.inputs import TrainingInput
from sagemaker.workflow.steps import TuningStep

tuner = HyperparameterTuner(..., sagemaker_session=PipelineSession())
    
step_tuning = TuningStep(
    name = "HPTuning",
    step_args = tuner.fit(inputs=TrainingInput(s3_data="s3://amzn-s3-demo-bucket/my-data"))
)
```

**Get the best model version**

The following example shows how to get the best model version from the tuning job using the `get_top_model_s3_uri` method. At most, the top 50 performing versions are available ranked according to [HyperParameterTuningJobObjective](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_HyperParameterTuningJobObjective.html). The `top_k` argument is an index into the versions, where `top_k=0` is the best-performing version and `top_k=49` is the worst-performing version.

```
best_model = Model(
    image_uri=image_uri,
    model_data=step_tuning.get_top_model_s3_uri(
        top_k=0,
        s3_bucket=sagemaker_session.default_bucket()
    ),
    ...
)
```

For more information on tuning step requirements, see the [sagemaker.workflow.steps.TuningStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TuningStep) documentation.

## Fine-tuning step
<a name="step-type-fine-tuning"></a>

Fine-tuning trains a pretrained foundation model from Amazon SageMaker JumpStart on a new dataset. This process, also known as transfer learning, can produce accurate models with smaller datasets and less training time. When you fine-tune a model, you can use the default dataset or choose your own data. To learn more about fine-tuning a foundation model from JumpStart, see [Fine-Tune a Model](jumpstart-fine-tune.md).

The fine-tuning step uses an Amazon SageMaker training job to customize your model. Ensure that your IAM role has the `sagemaker:DescribeTrainingJob` and `sagemaker:CreateTrainingJob` API permissions to execute the fine-tuning job in your pipeline. To learn more about the required permissions for Amazon SageMaker AI and how to set them up, see [Amazon SageMaker AI API Permissions: Actions, Permissions, and Resources Reference](api-permissions-reference.md).

To add a **Fine-tune model** step to your pipeline using the drag-and-drop editor, follow these steps:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Fine-tune model** and drag it to the canvas.

1. In the canvas, choose the **Fine-tune model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs.

1. If the canvas includes any step that immediately precedes the **Fine-tune model** step you added, click and drag the cursor from the step to the **Fine-tune model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Fine-tune model** step you added, click and drag the cursor from the **Fine-tune model** step to the step to create an edge.

## AutoML step
<a name="step-type-automl"></a>

Use the [AutoML](https://sagemaker.readthedocs.io/en/stable/api/training/automl.html) API to create an AutoML job to automatically train a model. For more information on AutoML jobs, see [Automate model development with Amazon SageMaker Autopilot](https://docs.aws.amazon.com/sagemaker/latest/dg/autopilot-automate-model-development.html). 

**Note**  
Currently, the AutoML step supports only [ensembling training mode](https://docs.aws.amazon.com/sagemaker/latest/dg/autopilot-model-support-validation.html).

The following example shows how to create a definition using `AutoMLStep`.

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.automl_step import AutoMLStep

pipeline_session = PipelineSession()

auto_ml = AutoML(...,
    role="<role>",
    target_attribute_name="my_target_attribute_name",
    mode="ENSEMBLING",
    sagemaker_session=pipeline_session) 

input_training = AutoMLInput(
    inputs="s3://amzn-s3-demo-bucket/my-training-data",
    target_attribute_name="my_target_attribute_name",
    channel_type="training",
)
input_validation = AutoMLInput(
    inputs="s3://amzn-s3-demo-bucket/my-validation-data",
    target_attribute_name="my_target_attribute_name",
    channel_type="validation",
)

step_args = auto_ml.fit(
    inputs=[input_training, input_validation]
)

step_automl = AutoMLStep(
    name="AutoMLStep",
    step_args=step_args,
)
```

**Get the best model version**

The AutoML step automatically trains several model candidates. Get the model with the best objective metric from the AutoML job using the `get_best_auto_ml_model` method as follows. You must also use an IAM `role` to access model artifacts.

```
best_model = step_automl.get_best_auto_ml_model(role=<role>)
```

For more information, see the [AutoML](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.automl_step.AutoMLStep) step in the SageMaker Python SDK.

## Model step
<a name="step-type-model"></a>

Use a `ModelStep` to create or register a SageMaker AI model. For more information on `ModelStep` requirements, see the [sagemaker.workflow.model\$1step.ModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.model_step.ModelStep) documentation.

### Create a model
<a name="step-type-model-create"></a>

You can use a `ModelStep` to create a SageMaker AI model. A `ModelStep` requires model artifacts and information about the SageMaker AI instance type that you need to use to create the model. For more information about SageMaker AI models, see [Train a Model with Amazon SageMaker AI](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html).

The following example shows how to create a `ModelStep` definition.

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.model import Model
from sagemaker.workflow.model_step import ModelStep

step_train = TrainingStep(...)
model = Model(
    image_uri=pytorch_estimator.training_image_uri(),
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    sagemaker_session=PipelineSession(),
    role=role,
)

step_model_create = ModelStep(
   name="MyModelCreationStep",
   step_args=model.create(instance_type="ml.m5.xlarge"),
)
```

### Register a model
<a name="step-type-model-register"></a>

You can use a `ModelStep` to register a `sagemaker.model.Model` or a `sagemaker.pipeline.PipelineModel` with the Amazon SageMaker Model Registry. A `PipelineModel` represents an inference pipeline, which is a model composed of a linear sequence of containers that process inference requests. For more information about how to register a model, see [Model Registration Deployment with Model Registry](model-registry.md).

The following example shows how to create a `ModelStep` that registers a `PipelineModel`.

```
import time

from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.sklearn import SKLearnModel
from sagemaker.xgboost import XGBoostModel

pipeline_session = PipelineSession()

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

sklearn_model = SKLearnModel(
   model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
   entry_point='inference.py',
   source_dir='sklearn_source_dir/',
   code_location=code_location,
   framework_version='1.0-1',
   role=role,
   sagemaker_session=pipeline_session,
   py_version='py3'
)

xgboost_model = XGBoostModel(
   model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
   entry_point='inference.py',
   source_dir='xgboost_source_dir/',
   code_location=code_location,
   framework_version='0.90-2',
   py_version='py3',
   sagemaker_session=pipeline_session,
   role=role
)

from sagemaker.workflow.model_step import ModelStep
from sagemaker import PipelineModel

pipeline_model = PipelineModel(
   models=[sklearn_model, xgboost_model],
   role=role,sagemaker_session=pipeline_session,
)

register_model_step_args = pipeline_model.register(
    content_types=["application/json"],
   response_types=["application/json"],
   inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
   transform_instances=["ml.m5.xlarge"],
   model_package_group_name='sipgroup',
)

step_model_registration = ModelStep(
   name="AbaloneRegisterModel",
   step_args=register_model_step_args,
)
```

## Create model step
<a name="step-type-create-model"></a>

You use a Create model step to create a SageMaker AI model. For more information on SageMaker AI models, see [Train a Model with Amazon SageMaker](how-it-works-training.md).

A create model step requires model artifacts and information about the SageMaker AI instance type that you need to use to create the model. The following examples show how to create a Create model step definition. For more information about Create model step requirements, see the [sagemaker.workflow.steps.CreateModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.CreateModelStep) documentation.

------
#### [ Pipeline Designer ]

To add a create model step to your pipeline, do the following:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Create model** and drag it to the canvas.

1. In the canvas, choose the **Create model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.CreateModelStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.CreateModelStep).

1. If the canvas includes any step that immediately precedes the **Create model** step you added, click and drag the cursor from the step to the **Create model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Create model** step you added, click and drag the cursor from the **Create model** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

**Important**  
We recommend using [Model step](#step-type-model) to create models as of v2.90.0 of the SageMaker AI Python SDK. `CreateModelStep` will continue to work in previous versions of the SageMaker Python SDK, but is no longer actively supported.

```
from sagemaker.workflow.steps import CreateModelStep

step_create_model = CreateModelStep(
    name="AbaloneCreateModel",
    model=best_model,
    inputs=inputs
)
```

------

## Register model step
<a name="step-type-register-model"></a>

The Register model step registers a model into the SageMaker Model Registry.

------
#### [ Pipeline Designer ]

To register a model from a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Register model** and drag it to the canvas.

1. In the canvas, choose the **Register model** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.step\$1collections.RegisterModel](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel).

1. If the canvas includes any step that immediately precedes the **Register model** step you added, click and drag the cursor from the step to the **Register model** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Register model** step you added, click and drag the cursor from the **Register model** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

**Important**  
We recommend using [Model step](#step-type-model) to register models as of v2.90.0 of the SageMaker AI Python SDK. `RegisterModel` will continue to work in previous versions of the SageMaker Python SDK, but is no longer actively supported.

You use a `RegisterModel` step to register a [sagemaker.model.Model](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html) or a [sagemaker.pipeline.PipelineModel](https://sagemaker.readthedocs.io/en/stable/api/inference/pipeline.html#pipelinemodel) with the Amazon SageMaker Model Registry. A `PipelineModel` represents an inference pipeline, which is a model composed of a linear sequence of containers that process inference requests.

For more information about how to register a model, see [Model Registration Deployment with Model Registry](model-registry.md). For more information on `RegisterModel` step requirements, see the [sagemaker.workflow.step\$1collections.RegisterModel](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_collections.RegisterModel) documentation.

The following example shows how to create a `RegisterModel` step that registers a `PipelineModel`.

```
import time
from sagemaker.sklearn import SKLearnModel
from sagemaker.xgboost import XGBoostModel

code_location = 's3://{0}/{1}/code'.format(bucket_name, prefix)

sklearn_model = SKLearnModel(model_data=processing_step.properties.ProcessingOutputConfig.Outputs['model'].S3Output.S3Uri,
 entry_point='inference.py',
 source_dir='sklearn_source_dir/',
 code_location=code_location,
 framework_version='1.0-1',
 role=role,
 sagemaker_session=sagemaker_session,
 py_version='py3')

xgboost_model = XGBoostModel(model_data=training_step.properties.ModelArtifacts.S3ModelArtifacts,
 entry_point='inference.py',
 source_dir='xgboost_source_dir/',
 code_location=code_location,
 framework_version='0.90-2',
 py_version='py3',
 sagemaker_session=sagemaker_session,
 role=role)

from sagemaker.workflow.step_collections import RegisterModel
from sagemaker import PipelineModel
pipeline_model = PipelineModel(models=[sklearn_model,xgboost_model],role=role,sagemaker_session=sagemaker_session)

step_register = RegisterModel(
 name="AbaloneRegisterModel",
 model=pipeline_model,
 content_types=["application/json"],
 response_types=["application/json"],
 inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
 transform_instances=["ml.m5.xlarge"],
 model_package_group_name='sipgroup',
)
```

If `model` isn't provided, the register model step requires an estimator as shown in the following example.

```
from sagemaker.workflow.step_collections import RegisterModel

step_register = RegisterModel(
    name="AbaloneRegisterModel",
    estimator=xgb_train,
    model_data=step_train.properties.ModelArtifacts.S3ModelArtifacts,
    content_types=["text/csv"],
    response_types=["text/csv"],
    inference_instances=["ml.t2.medium", "ml.m5.xlarge"],
    transform_instances=["ml.m5.xlarge"],
    model_package_group_name=model_package_group_name,
    approval_status=model_approval_status,
    model_metrics=model_metrics
)
```

------

## Deploy model (endpoint) step
<a name="step-type-deploy-model-endpoint"></a>

In the Pipeline Designer, use the Deploy model (endpoint) step to deploy your model to an endpoint. You can create a new endpoint or use an existing endpoint. Real-time inference is ideal for inference workloads where you have real-time, interactive, low latency requirements. You can deploy your model to SageMaker AI Hosting services and get a real-time endpoint that can be used for inference. These endpoints are fully managed and support auto-scaling. To learn more about real-time inference in SageMaker AI, see [Real-time inference](realtime-endpoints.md).

Before adding a deploy model step to your pipeline, make sure that your IAM role has the following permissions:
+ `sagemaker:CreateModel`
+ `sagemaker:CreateEndpointConfig`
+ `sagemaker:CreateEndpoint`
+ `sagemaker:UpdateEndpoint`
+ `sagemaker:DescribeModel`
+ `sagemaker:DescribeEndpointConfig`
+ `sagemaker:DescribeEndpoint`

To learn more about all the required permissions for SageMaker AI and how to set them up, see [Amazon SageMaker AI API Permissions: Actions, Permissions, and Resources Reference](api-permissions-reference.md).

To add a model deployment step to your Pipeline in the drag-and-drop editor, complete the following steps:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Deploy model (endpoint)** and drag it to the canvas.

1. In the canvas, choose the **Deploy model (endpoint)** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs.

1. If the canvas includes any step that immediately precedes the **Deploy model (endpoint)** step you added, click and drag the cursor from the step to the **Deploy model (endpoint)** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Deploy model (endpoint)** step you added, click and drag the cursor from the **Deploy model (endpoint)** step to the step to create an edge.

## Transform step
<a name="step-type-transform"></a>

You use a transform step for batch transformation to run inference on an entire dataset. For more information about batch transformation, see [Batch transforms with inference pipelines](inference-pipeline-batch.md).

A transform step requires a transformer and the data on which to run batch transformation. The following example shows how to create a Transform step definition. For more information on Transform step requirements, see the [sagemaker.workflow.steps.TransformStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TransformStep) documentation.

------
#### [ Pipeline Designer ]

To add a batch transform step to your pipeline using the drag-and-drop visual editor, do the following:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Deploy model (batch transform)** and drag it to the canvas.

1. In the canvas, choose the **Deploy model (batch transform)** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.steps.TransformStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.steps.TransformStep).

1. If the canvas includes any step that immediately precedes the **Deploy model (batch transform)** step you added, click and drag the cursor from the step to the **Deploy model (batch transform)** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Deploy model (batch transform)** step you added, click and drag the cursor from the **Deploy model (batch transform)** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

```
from sagemaker.workflow.pipeline_context import PipelineSession

from sagemaker.transformer import Transformer
from sagemaker.inputs import TransformInput
from sagemaker.workflow.steps import TransformStep

transformer = Transformer(..., sagemaker_session=PipelineSession())

step_transform = TransformStep(
    name="AbaloneTransform",
    step_args=transformer.transform(data="s3://amzn-s3-demo-bucket/my-data"),
)
```

------

## Condition step
<a name="step-type-condition"></a>

You use a condition step to evaluate the condition of step properties to assess which action should be taken next in the pipeline.

A condition step requires:
+ A list of conditions.
+ A list of steps to run if the condition evaluates to `true`.
+ A list of steps to run if the condition evaluates to `false`.

------
#### [ Pipeline Designer ]

To add a condition step to a pipeline using the Pipeline Designer, do the following:

1. Open the Amazon SageMaker Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Condition** and drag it to the canvas.

1. In the canvas, choose the **Condition** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.condition\$1step.ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.condition_step.ConditionStep).

1. If the canvas includes any step that immediately precedes the **Condition** step you added, click and drag the cursor from the step to the **Condition** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Condition** step you added, click and drag the cursor from the **Condition** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

 The following example shows how to create a `ConditionStep` definition. 

**Limitations**
+ Pipelines doesn't support the use of nested condition steps. You can't pass a condition step as the input for another condition step.
+ A condition step can't use identical steps in both branches. If you need the same step functionality in both branches, duplicate the step and give it a different name.

```
from sagemaker.workflow.conditions import ConditionLessThanOrEqualTo
from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.functions import JsonGet

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=evaluation_report,
        json_path="regression_metrics.mse.value"
    ),
    right=6.0
)

step_cond = ConditionStep(
    name="AbaloneMSECond",
    conditions=[cond_lte],
    if_steps=[step_register, step_create_model, step_transform],
    else_steps=[]
)
```

For more information on `ConditionStep` requirements, see the [sagemaker.workflow.condition\$1step.ConditionStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#conditionstep) API reference. For more information on supported conditions, see *[Amazon SageMaker Pipelines - Conditions](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#conditions)* in the SageMaker AI Python SDK documentation. 

------

## Callback step
<a name="step-type-callback"></a>

Use a `Callback` step to add additional processes and AWS services into your workflow that aren't directly provided by Amazon SageMaker Pipelines. When a `Callback` step runs, the following procedure occurs:
+ Pipelines sends a message to a customer-specified Amazon Simple Queue Service (Amazon SQS) queue. The message contains a Pipelines–generated token and a customer-supplied list of input parameters. After sending the message, Pipelines waits for a response from the customer.
+ The customer retrieves the message from the Amazon SQS queue and starts their custom process.
+ When the process finishes, the customer calls one of the following APIs and submits the Pipelines–generated token:
  +  [SendPipelineExecutionStepSuccess](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_SendPipelineExecutionStepSuccess.html), along with a list of output parameters
  +  [SendPipelineExecutionStepFailure](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_SendPipelineExecutionStepFailure.html), along with a failure reason
+ The API call causes Pipelines to either continue the pipeline process or fail the process.

For more information on `Callback` step requirements, see the [sagemaker.workflow.callback\$1step.CallbackStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.callback_step.CallbackStep) documentation. For a complete solution, see [Extend SageMaker Pipelines to include custom steps using callback steps](https://aws.amazon.com/blogs/machine-learning/extend-amazon-sagemaker-pipelines-to-include-custom-steps-using-callback-steps/).

**Important**  
`Callback` steps were introduced in Amazon SageMaker Python SDK v2.45.0 and Amazon SageMaker Studio Classic v3.6.2. You must update Studio Classic before you use a `Callback` step or the pipeline DAG doesn't display. To update Studio Classic, see [Shut Down and Update Amazon SageMaker Studio Classic](studio-tasks-update-studio.md).

The following sample shows an implementation of the preceding procedure.

```
from sagemaker.workflow.callback_step import CallbackStep

step_callback = CallbackStep(
    name="MyCallbackStep",
    sqs_queue_url="https://sqs.us-east-2.amazonaws.com/012345678901/MyCallbackQueue",
    inputs={...},
    outputs=[...]
)

callback_handler_code = '
    import boto3
    import json

    def handler(event, context):
        sagemaker_client=boto3.client("sagemaker")

        for record in event["Records"]:
            payload=json.loads(record["body"])
            token=payload["token"]

            # Custom processing

            # Call SageMaker AI to complete the step
            sagemaker_client.send_pipeline_execution_step_success(
                CallbackToken=token,
                OutputParameters={...}
            )
'
```

**Note**  
Output parameters for `CallbackStep` should not be nested. For example, if you use a nested dictionary as your output parameter, then the dictionary is treated as a single string (ex. `{"output1": "{\"nested_output1\":\"my-output\"}"}`). If you provide a nested value, then when you try to refer to a particular output parameter, SageMaker AI throws a non-retryable client error.

**Stopping behavior**

A pipeline process doesn't stop while a `Callback` step is running.

When you call [StopPipelineExecution](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_StopPipelineExecution.html) on a pipeline process with a running `Callback` step, Pipelines sends an Amazon SQS message to the SQS queue. The body of the SQS message contains a **Status** field, which is set to `Stopping`. The following shows an example SQS message body.

```
{
  "token": "26vcYbeWsZ",
  "pipelineExecutionArn": "arn:aws:sagemaker:us-east-2:012345678901:pipeline/callback-pipeline/execution/7pinimwddh3a",
  "arguments": {
    "number": 5,
    "stringArg": "some-arg",
    "inputData": "s3://sagemaker-us-west-2-012345678901/abalone/abalone-dataset.csv"
  },
  "status": "Stopping"
}
```

You should add logic to your Amazon SQS message consumer to take any needed action (for example, resource cleanup) upon receipt of the message. Then add a call to `SendPipelineExecutionStepSuccess` or `SendPipelineExecutionStepFailure`.

Only when Pipelines receives one of these calls does it stop the pipeline process.

## Lambda step
<a name="step-type-lambda"></a>

You use a Lambda step to run an AWS Lambda function. You can run an existing Lambda function, or SageMaker AI can create and run a new Lambda function. If you choose to use an existing Lambda function, it must be in the same AWS Region as the SageMaker AI pipeline. For a notebook that shows how to use a Lambda step in a SageMaker AI pipeline, see [sagemaker-pipelines-lambda-step.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/lambda-step/sagemaker-pipelines-lambda-step.ipynb).

**Important**  
Lambda steps were introduced in Amazon SageMaker Python SDK v2.51.0 and Amazon SageMaker Studio Classic v3.9.1. You must update Studio Classic before you use a Lambda step or the pipeline DAG doesn't display. To update Studio Classic, see [Shut Down and Update Amazon SageMaker Studio Classic](studio-tasks-update-studio.md).

SageMaker AI provides the [sagemaker.lambda\$1helper.Lambda](https://sagemaker.readthedocs.io/en/stable/api/utility/lambda_helper.html) class to create, update, invoke, and delete Lambda functions. `Lambda` has the following signature.

```
Lambda(
    function_arn,       # Only required argument to invoke an existing Lambda function

    # The following arguments are required to create a Lambda function:
    function_name,
    execution_role_arn,
    zipped_code_dir,    # Specify either zipped_code_dir and s3_bucket, OR script
    s3_bucket,          # S3 bucket where zipped_code_dir is uploaded
    script,             # Path of Lambda function script
    handler,            # Lambda handler specified as "lambda_script.lambda_handler"
    timeout,            # Maximum time the Lambda function can run before the lambda step fails
    ...
)
```

The [sagemaker.workflow.lambda\$1step.LambdaStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.lambda_step.LambdaStep) class has a `lambda_func` argument of type `Lambda`. To invoke an existing Lambda function, the only requirement is to supply the Amazon Resource Name (ARN) of the function to `function_arn`. If you don't supply a value for `function_arn`, you must specify `handler` and one of the following:
+ `zipped_code_dir` – The path of the zipped Lambda function

  `s3_bucket` – Amazon S3 bucket where `zipped_code_dir` is to be uploaded
+ `script` – The path of the Lambda function script file

The following example shows how to create a `Lambda` step definition that invokes an existing Lambda function.

```
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

step_lambda = LambdaStep(
    name="ProcessingLambda",
    lambda_func=Lambda(
        function_arn="arn:aws:lambda:us-west-2:012345678910:function:split-dataset-lambda"
    ),
    inputs={
        s3_bucket = s3_bucket,
        data_file = data_file
    },
    outputs=[
        "train_file", "test_file"
    ]
)
```

The following example shows how to create a `Lambda` step definition that creates and invokes a Lambda function using a Lambda function script.

```
from sagemaker.workflow.lambda_step import LambdaStep
from sagemaker.lambda_helper import Lambda

step_lambda = LambdaStep(
    name="ProcessingLambda",
    lambda_func=Lambda(
      function_name="split-dataset-lambda",
      execution_role_arn=execution_role_arn,
      script="lambda_script.py",
      handler="lambda_script.lambda_handler",
      ...
    ),
    inputs={
        s3_bucket = s3_bucket,
        data_file = data_file
    },
    outputs=[
        "train_file", "test_file"
    ]
)
```

**Inputs and outputs**

If your `Lambda` function has inputs or outputs, these must also be defined in your `Lambda` step.

**Note**  
Input and output parameters should not be nested. For example, if you use a nested dictionary as your output parameter, then the dictionary is treated as a single string (ex. `{"output1": "{\"nested_output1\":\"my-output\"}"}`). If you provide a nested value and try to refer to it later, a non-retryable client error is thrown.

When defining the `Lambda` step, `inputs` must be a dictionary of key-value pairs. Each value of the `inputs` dictionary must be a primitive type (string, integer, or float). Nested objects are not supported. If left undefined, the `inputs` value defaults to `None`.

The `outputs` value must be a list of keys. These keys refer to a dictionary defined in the output of the `Lambda` function. Like `inputs`, these keys must be primitive types, and nested objects are not supported.

**Timeout and stopping behavior**

The `Lambda` class has a `timeout` argument that specifies the maximum time that the Lambda function can run. The default value is 120 seconds with a maximum value of 10 minutes. If the Lambda function is running when the timeout is met, the Lambda step fails; however, the Lambda function continues to run.

A pipeline process can't be stopped while a Lambda step is running because the Lambda function invoked by the Lambda step can't be stopped. If you stop the process while the Lambda function is running, the pipeline waits for the function to finish or until the timeout is hit. This depends on whichever occurs first. The process then stops. If the Lambda function finishes, the pipeline process status is `Stopped`. If the timeout is hit the pipeline process status is `Failed`.

## ClarifyCheck step
<a name="step-type-clarify-check"></a>

You can use the `ClarifyCheck` step to conduct baseline drift checks against previous baselines for bias analysis and model explainability. You can then generate and [register your baselines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html#pipelines-quality-clarify-baseline-calculations) with the `model.register()` method and pass the output of that method to [Model step](#step-type-model) using `[step\$1args](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#model-step)`. These baselines for drift check can be used by Amazon SageMaker Model Monitor for your model endpoints. As a result, you don’t need to do a [baseline](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-create-baseline.html) suggestion separately. 

The `ClarifyCheck` step can also pull baselines for drift check from the model registry. The `ClarifyCheck` step uses the SageMaker Clarify prebuilt container. This container provides a range of model monitoring capabilities, including constraint suggestion and constraint validation against a given baseline. For more information, see [Prebuilt SageMaker Clarify Containers](clarify-processing-job-configure-container.md).

### Configuring the ClarifyCheck step
<a name="configuring-step-type-clarify"></a>

You can configure the `ClarifyCheck` step to conduct only one of the following check types each time it’s used in a pipeline.
+ Data bias check
+ Model bias check
+ Model explainability check

To do this, set the `clarify_check_config` parameter with one of the following check type values:
+ `DataBiasCheckConfig`
+ `ModelBiasCheckConfig`
+ `ModelExplainabilityCheckConfig`

The `ClarifyCheck` step launches a processing job that runs the SageMaker AI Clarify prebuilt container and requires dedicated [configurations for the check and the processing job](https://docs.aws.amazon.com/sagemaker/latest/dg/clarify-configure-processing-jobs.html). `ClarifyCheckConfig` and `CheckJobConfig` are helper functions for these configurations. These helper functions are aligned with how the SageMaker Clarify processing job computes for checking model bias, data bias, or model explainability. For more information, see [Run SageMaker Clarify Processing Jobs for Bias Analysis and Explainability](clarify-processing-job-run.md). 

### Controlling step behaviors for drift check
<a name="controlling-step-type-clarify"></a>

The `ClarifyCheck` step requires the following two boolean flags to control its behavior:
+ `skip_check`: This parameter indicates if the drift check against the previous baseline is skipped or not. If it is set to `False`, the previous baseline of the configured check type must be available.
+ `register_new_baseline`: This parameter indicates if a newly calculated baseline can be accessed though step property `BaselineUsedForDriftCheckConstraints`. If it is set to `False`, the previous baseline of the configured check type also must be available. This can be accessed through the `BaselineUsedForDriftCheckConstraints` property. 

For more information, see [Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines](pipelines-quality-clarify-baseline-lifecycle.md).

### Working with baselines
<a name="step-type-clarify-working-with-baselines"></a>

You can optionally specify the `model_package_group_name` to locate the existing baseline. Then, the `ClarifyCheck` step pulls the `DriftCheckBaselines` on the latest approved model package in the model package group. 

Or, you can provide a previous baseline through the `supplied_baseline_constraints` parameter. If you specify both the `model_package_group_name` and the `supplied_baseline_constraints`, the `ClarifyCheck` step uses the baseline specified by the `supplied_baseline_constraints` parameter.

For more information on using the `ClarifyCheck` step requirements, see the [ sagemaker.workflow.steps.ClarifyCheckStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.clarify_check_step.ClarifyCheckStep) in the *Amazon SageMaker AI SageMaker AI SDK for Python*. For an Amazon SageMaker Studio Classic notebook that shows how to use `ClarifyCheck` step in Pipelines, see [sagemaker-pipeline-model-monitor-clarify-steps.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/model-monitor-clarify-pipelines/sagemaker-pipeline-model-monitor-clarify-steps.ipynb).

**Example Create a `ClarifyCheck` step for data bias check**  

```
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.clarify_check_step import DataBiasCheckConfig, ClarifyCheckStep
from sagemaker.workflow.execution_variables import ExecutionVariables

check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_bias_data_config = DataConfig(
    s3_data_input_path=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    s3_output_path=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'databiascheckstep']),
    label=0,
    dataset_type="text/csv",
    s3_analysis_config_output_path=data_bias_analysis_cfg_output_path,
)

data_bias_config = BiasConfig(
    label_values_or_threshold=[15.0], facet_name=[8], facet_values_or_threshold=[[0.5]]  
)

data_bias_check_config = DataBiasCheckConfig(
    data_config=data_bias_data_config,
    data_bias_config=data_bias_config,
)h

data_bias_check_step = ClarifyCheckStep(
    name="DataBiasCheckStep",
    clarify_check_config=data_bias_check_config,
    check_job_config=check_job_config,
    skip_check=False,
    register_new_baseline=False
   supplied_baseline_constraints="s3://sagemaker-us-west-2-111122223333/baseline/analysis.json",
    model_package_group_name="MyModelPackageGroup"
)
```

## QualityCheck step
<a name="step-type-quality-check"></a>

Use the `QualityCheck` step to conduct [baseline suggestions](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-create-baseline.html) and drift checks against a previous baseline for data quality or model quality in a pipeline. You can then generate and [register your baselines](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html#pipelines-quality-clarify-baseline-calculations) with the `model.register()` method and pass the output of that method to [Model step](#step-type-model) using `[step\$1args](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#model-step)`. ]

Model Monitor can use these baselines for drift check for your model endpoints so that you don’t need to do a baseline suggestion separately. The `QualityCheck` step can also pull baselines for drift check from the model registry. The `QualityCheck` step leverages the Amazon SageMaker AI Model Monitor prebuilt container. This container has a range of model monitoring capabilities including constraint suggestion, statistics generation, and constraint validation against a baseline. For more information, see [Amazon SageMaker Model Monitor prebuilt container](model-monitor-pre-built-container.md).

### Configuring the QualityCheck step
<a name="configuring-step-type-quality"></a>

You can configure the `QualityCheck` step to run only one of the following check types each time it’s used in a pipeline.
+ Data quality check
+ Model quality check

You do this by setting the `quality_check_config` parameter with one of the following check type values:
+ `DataQualityCheckConfig`
+ `ModelQualityCheckConfig`

The `QualityCheck` step launches a processing job that runs the Model Monitor prebuilt container and requires dedicated configurations for the check and the processing job. The `QualityCheckConfig` and `CheckJobConfig` are helper functions for these configurations. These helper functions are aligned with how Model Monitor creates a baseline for the model quality or data quality monitoring. For more information on the Model Monitor baseline suggestions, see [Create a Baseline](model-monitor-create-baseline.md) and [Create a model quality baseline](model-monitor-model-quality-baseline.md).

### Controlling step behaviors for drift check
<a name="controlling-step-type-quality"></a>

The `QualityCheck` step requires the following two Boolean flags to control its behavior:
+ `skip_check`: This parameter indicates if the drift check against the previous baseline is skipped or not. If it is set to `False`, the previous baseline of the configured check type must be available.
+ `register_new_baseline`: This parameter indicates if a newly calculated baseline can be accessed through step properties `BaselineUsedForDriftCheckConstraints` and `BaselineUsedForDriftCheckStatistics`. If it is set to `False`, the previous baseline of the configured check type must also be available. These can be accessed through the `BaselineUsedForDriftCheckConstraints` and `BaselineUsedForDriftCheckStatistics` properties.

For more information, see [Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines](pipelines-quality-clarify-baseline-lifecycle.md).

### Working with baselines
<a name="step-type-quality-working-with-baselines"></a>

You can specify a previous baseline directly through the `supplied_baseline_statistics` and `supplied_baseline_constraints` parameters. You can also specify the `model_package_group_name` and the `QualityCheck` step pulls the `DriftCheckBaselines` on the latest approved model package in the model package group. 

When you specify the following, the `QualityCheck` step uses the baseline specified by `supplied_baseline_constraints` and `supplied_baseline_statistics` on the check type of the `QualityCheck` step.
+ `model_package_group_name`
+ `supplied_baseline_constraints`
+ `supplied_baseline_statistics`

For more information on using the `QualityCheck` step requirements, see the [sagemaker.workflow.steps.QualityCheckStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.quality_check_step.QualityCheckStep) in the *Amazon SageMaker AI SageMaker AI SDK for Python*. For an Amazon SageMaker Studio Classic notebook that shows how to use `QualityCheck` step in Pipelines, see [sagemaker-pipeline-model-monitor-clarify-steps.ipynb](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/model-monitor-clarify-pipelines/sagemaker-pipeline-model-monitor-clarify-steps.ipynb). 

**Example Create a `QualityCheck` step for data quality check**  

```
from sagemaker.workflow.check_job_config import CheckJobConfig
from sagemaker.workflow.quality_check_step import DataQualityCheckConfig, QualityCheckStep
from sagemaker.workflow.execution_variables import ExecutionVariables

check_job_config = CheckJobConfig(
    role=role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    volume_size_in_gb=120,
    sagemaker_session=sagemaker_session,
)

data_quality_check_config = DataQualityCheckConfig(
    baseline_dataset=step_process.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
    dataset_format=DatasetFormat.csv(header=False, output_columns_position="START"),
    output_s3_uri=Join(on='/', values=['s3:/', your_bucket, base_job_prefix, ExecutionVariables.PIPELINE_EXECUTION_ID, 'dataqualitycheckstep'])
)

data_quality_check_step = QualityCheckStep(
    name="DataQualityCheckStep",
    skip_check=False,
    register_new_baseline=False,
    quality_check_config=data_quality_check_config,
    check_job_config=check_job_config,
    supplied_baseline_statistics="s3://sagemaker-us-west-2-555555555555/baseline/statistics.json",
    supplied_baseline_constraints="s3://sagemaker-us-west-2-555555555555/baseline/constraints.json",
    model_package_group_name="MyModelPackageGroup"
)
```

## EMR step
<a name="step-type-emr"></a>

Use the Amazon SageMaker Pipelines [EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-overview.html) step to:
+ Process [Amazon EMR steps](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-work-with-steps.html) on a running Amazon EMR cluster.
+ Have the pipeline create and manage an Amazon EMR cluster for you.

For more information about Amazon EMR, see [Getting started with Amazon EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs.html).

The EMR step requires that `EMRStepConfig` include the location of the JAR file used by the Amazon EMR cluster and any arguments to be passed. You also provide the Amazon EMR cluster ID if you want to run the step on a running EMR cluster. You can also pass the cluster configuration to run the EMR step on a cluster that it creates, manages, and terminates for you. The following sections include examples and links to sample notebooks demonstrating both methods.

**Note**  
EMR steps require that the role passed to your pipeline has additional permissions. Attach the [AWS managed policy: `AmazonSageMakerPipelinesIntegrations`](https://docs.aws.amazon.com/sagemaker/latest/dg/security-iam-awsmanpol-pipelines.html#security-iam-awsmanpol-AmazonSageMakerPipelinesIntegrations) to your pipeline role, or ensure that the role includes the permissions in that policy.
If you process an EMR step on a running cluster, you can only use a cluster that is in one of the following states:   
`STARTING`
`BOOTSTRAPPING`
`RUNNING`
`WAITING`
If you process EMR steps on a running cluster, you can have at most 256 EMR steps in a `PENDING` state on an EMR cluster. EMR steps submitted beyond this limit result in pipeline execution failure. You may consider using [Retry Policy for Pipeline Steps](pipelines-retry-policy.md).
You can specify either cluster ID or cluster configuration, but not both.
The EMR step relies on Amazon EventBridge to monitor changes in the EMR step or cluster state. If you process your Amazon EMR job on a running cluster, the EMR step uses the `SageMakerPipelineExecutionEMRStepStatusUpdateRule` rule to monitor EMR step state. If you process your job on a cluster that the EMR step creates, the step uses the `SageMakerPipelineExecutionEMRClusterStatusRule` rule to monitor changes in cluster state. If you see either of these EventBridge rules in your AWS account, do not delete them or else your EMR step may not complete.

**Add an Amazon EMR step to your pipeline**

To add an EMR step to your pipeline, do the following:
+ Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).
+ In the left navigation pane, select **Pipelines**.
+ Choose **Create**.
+ Choose **Blank**.
+ In the left sidebar, choose **Process data** and drag it to the canvas.
+ In the canvas, choose the **Process data** step you added.
+ In the right sidebar, under mode, choose **EMR (managed)**.
+ In the right sidebar, complete the forms in the **Setting and Details** tabs. For information about the fields in these tabs, see [sagemaker.workflow.fail\$1step.EMRstep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.emr_step.EMRStep).

**Launch a new job on a running Amazon EMR cluster**

To launch a new job on a running Amazon EMR cluster, pass the cluster ID as a string to the `cluster_id` argument of `EMRStep`. The following example demonstrates this procedure.

```
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig

emr_config = EMRStepConfig(
    jar="jar-location", # required, path to jar file used
    args=["--verbose", "--force"], # optional list of arguments to pass to the jar
    main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest 
    properties=[ # optional list of Java properties that are set when the step runs
    {
        "key": "mapred.tasktracker.map.tasks.maximum",
        "value": "2"
    },
    {
        "key": "mapreduce.map.sort.spill.percent",
        "value": "0.90"
   },
   {
       "key": "mapreduce.tasktracker.reduce.tasks.maximum",
       "value": "5"
    }
  ]
)

step_emr = EMRStep (
    name="EMRSampleStep", # required
    cluster_id="j-1ABCDEFG2HIJK", # include cluster_id to use a running cluster
    step_config=emr_config, # required
    display_name="My EMR Step",
    description="Pipeline step to execute EMR job"
)
```

For a sample notebook that guides you through a complete example, see [ Pipelines EMR Step With Running EMR Cluster](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-running-emr-cluster.ipynb).

**Launch a new job on a new Amazon EMR cluster**

To launch a new job on a new cluster that `EMRStep` creates for you, provide your cluster configuration as a dictionary. The dictionary must have the same structure as a [RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html) request. However, do not include the following fields in your cluster configuration:
+ [`Name`]
+ [`Steps`]
+ [`AutoTerminationPolicy`]
+ [`Instances`][`KeepJobFlowAliveWhenNoSteps`]
+ [`Instances`][`TerminationProtected`]

All other `RunJobFlow` arguments are available for use in your cluster configuration. For details about the request syntax, see [RunJobFlow](https://docs.aws.amazon.com/emr/latest/APIReference/API_RunJobFlow.html).

The following example passes a cluster configuration to an EMR step definition. This prompts the step to launch a new job on a new EMR cluster. The EMR cluster configuration in this example includes specifications for primary and core EMR cluster nodes. For more information about Amazon EMR node types, see [ Understand node types: primary, core, and task nodes](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-master-core-task-nodes.html).

```
from sagemaker.workflow.emr_step import EMRStep, EMRStepConfig

emr_step_config = EMRStepConfig(
    jar="jar-location", # required, path to jar file used
    args=["--verbose", "--force"], # optional list of arguments to pass to the jar
    main_class="com.my.Main1", # optional main class, this can be omitted if jar above has a manifest 
    properties=[ # optional list of Java properties that are set when the step runs
    {
        "key": "mapred.tasktracker.map.tasks.maximum",
        "value": "2"
    },
    {
        "key": "mapreduce.map.sort.spill.percent",
        "value": "0.90"
   },
   {
       "key": "mapreduce.tasktracker.reduce.tasks.maximum",
       "value": "5"
    }
  ]
)

# include your cluster configuration as a dictionary
emr_cluster_config = {
    "Applications": [
        {
            "Name": "Spark", 
        }
    ],
    "Instances":{
        "InstanceGroups":[
            {
                "InstanceRole": "MASTER",
                "InstanceCount": 1,
                "InstanceType": "m5.2xlarge"
            },
            {
                "InstanceRole": "CORE",
                "InstanceCount": 2,
                "InstanceType": "m5.2xlarge"
            }
        ]
    },
    "BootstrapActions":[],
    "ReleaseLabel": "emr-6.6.0",
    "JobFlowRole": "job-flow-role",
    "ServiceRole": "service-role"
}

emr_step = EMRStep(
    name="emr-step",
    cluster_id=None,
    display_name="emr_step",
    description="MyEMRStepDescription",
    step_config=emr_step_config,
    cluster_config=emr_cluster_config
)
```

For a sample notebook that guides you through a complete example, see [ Pipelines EMR Step With Cluster Lifecycle Management](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-pipelines/tabular/emr-step/sagemaker-pipelines-emr-step-with-cluster-lifecycle-management.ipynb).

## EMR serverless step
<a name="step-type-serverless"></a>

To add an EMR serverless step to your pipeline, do the following:
+ Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-updated-launch.html).
+ In the left navigation pane, select **Pipelines**.
+ Choose **Create**.
+ Choose **Blank**.
+ In the left sidebar, choose **Process data** and drag it to the canvas.
+ In the canvas, choose the **Process data** step you added.
+ In the right sidebar, under mode, choose **EMR (serverless)**.
+ In the right sidebar, complete the forms in the **Setting and Details** tabs.

## Notebook job step
<a name="step-type-notebook-job"></a>

Use a `NotebookJobStep` to run your SageMaker Notebook Job non-interactively as a pipeline step. If you build your pipeline in the Pipelines drag-and-drop UI, use the [Execute code step](#step-type-executecode) to run your notebook. For more information about SageMaker Notebook Jobs, see [SageMaker Notebook Jobs](notebook-auto-run.md).

A `NotebookJobStep` requires at minimum an input notebook, image URI and kernel name. For more information about Notebook Job step requirements and other parameters you can set to customize your step, see [sagemaker.workflow.steps.NotebookJobStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.notebook_job_step.NotebookJobStep).

The following example uses minimum arguments to define a `NotebookJobStep`.

```
from sagemaker.workflow.notebook_job_step import NotebookJobStep


notebook_job_step = NotebookJobStep(
    input_notebook=input_notebook,
    image_uri=image_uri,
    kernel_name=kernel_name
)
```

Your `NotebookJobStep` pipeline step is treated as a SageMaker notebook job. As a result, track the execution status in the Studio Classic UI notebook job dashboard by including specific tags with the `tags` argument. For more details about tags to include, see [View your notebook jobs in the Studio UI dashboard](create-notebook-auto-run-sdk.md#create-notebook-auto-run-dash).

Also, if you schedule your notebook job using the SageMaker Python SDK, you can only specify certain images to run your notebook job. For more information, see [Image constraints for SageMaker AI Python SDK notebook jobs](notebook-auto-run-constraints.md#notebook-auto-run-constraints-image-sdk).

## Fail step
<a name="step-type-fail"></a>

Use a Fail step to stop an Amazon SageMaker Pipelines execution when a desired condition or state is not achieved. The Fail step also allows you to enter a custom error message, indicating the cause of the pipeline's execution failure.

**Note**  
When a Fail step and other pipeline steps execute at the same time, the pipeline does not terminate until all concurrent steps are completed.

### Limitations for using Fail step
<a name="step-type-fail-limitations"></a>
+ You cannot add a Fail step to the `DependsOn` list of other steps. For more information, see [Custom dependency between steps](build-and-manage-steps.md#build-and-manage-custom-dependency).
+ Other steps cannot reference the Fail step. It is *always* the last step in a pipeline's execution.
+ You cannot retry a pipeline execution ending with a Fail step.

You can create the Fail step error message in the form of a static text string. Alternatively, you can also use [Pipeline Parameters](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-parameters.html), a [Join](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html?highlight=Join#sagemaker.workflow.functions.Join) operation, or other [step properties](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#build-and-manage-properties) to create a more informative error message if you use the SDK.

------
#### [ Pipeline Designer ]

To add a Fail step to your pipeline, do the following:

1. Open the Studio console by following the instructions in [Launch Amazon SageMaker Studio](studio-updated-launch.md).

1. In the left navigation pane, select **Pipelines**.

1. Choose **Create**.

1. Choose **Blank**.

1. In the left sidebar, choose **Fail** and drag it to the canvas.

1. In the canvas, choose the **Fail** step you added.

1. In the right sidebar, complete the forms in the **Setting** and **Details** tabs. For information about the fields in these tabs, see [ sagemaker.workflow.fail\$1step.FailStep](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.fail_step.FailStep).

1. If the canvas includes any step that immediately precedes the **Fail** step you added, click and drag the cursor from the step to the **Fail** step to create an edge.

1. If the canvas includes any step that immediately succeeds the **Fail** step you added, click and drag the cursor from the **Fail** step to the step to create an edge.

------
#### [ SageMaker Python SDK ]

**Example**  
The following example code snippet uses a `FailStep` with an `ErrorMessage` configured with Pipeline Parameters and a `Join` operation.  

```
from sagemaker.workflow.fail_step import FailStep
from sagemaker.workflow.functions import Join
from sagemaker.workflow.parameters import ParameterInteger

mse_threshold_param = ParameterInteger(name="MseThreshold", default_value=5)
step_fail = FailStep(
    name="AbaloneMSEFail",
    error_message=Join(
        on=" ", values=["Execution failed due to MSE >", mse_threshold_param]
    ),
)
```

------

# Add integration
<a name="build-and-manage-steps-integration"></a>

MLflow integration allows you to use MLflow with pipelines to select a tracking server or serverless application, choose an experiment, and log metrics.

## Key concepts
<a name="add-integration-key-concepts"></a>

**Default app creation** - A default MLflow application will be created when you enter the pipeline visual editor.

**Integrations panel** - A new integrations panel includes MLflow, which you can select and configure.

**Update app and experiment** - The option to override selected application and experiment during the pipeline execution.

## How it works
<a name="add-integration-how-it-works"></a>
+ Go to **Pipeline Visual Editor**
+ Choose **Integration** on the toolbar
+ Choose **MLflow**
+ Configure the MLflow app and experiment

## Example screenshots
<a name="add-integration-example-screenshots"></a>

Integrations side panel

![\[The to do description.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/screenshot-pipeline-1.png)


MLflow configuration

![\[The to do description.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/screenshot-pipeline-2.png)


How to override experiment during pipeline execution

![\[The to do description.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/screenshot-pipeline-3.png)


## Step properties
<a name="build-and-manage-properties"></a>

Use the `properties` attribute to add data dependencies between steps in the pipeline. Pipelines use these data dependencies to construct the DAG from the pipeline definition. These properties can be referenced as placeholder values and are resolved at runtime. 

The `properties` attribute of a Pipelines step matches the object returned by a `Describe` call for the corresponding SageMaker AI job type. For each job type, the `Describe` call returns the following response object:
+ `ProcessingStep` – [DescribeProcessingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeProcessingJob.html)
+ `TrainingStep` – [DescribeTrainingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTrainingJob.html)
+ `TransformStep` – [DescribeTransformJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DescribeTransformJob.html)

To check which properties are referrable for each step type during data dependency creation, see *[Data Dependency - Property Reference](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#data-dependency-property-reference)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

## Step parallelism
<a name="build-and-manage-parallelism"></a>

When a step does not depend on any other step, it runs immediately upon pipeline execution. However, executing too many pipeline steps in parallel can quickly exhaust available resources. Control the number of concurrent steps for a pipeline execution with `ParallelismConfiguration`.

The following example uses `ParallelismConfiguration` to set the concurrent step limit to five.

```
pipeline.create(
    parallelism_config=ParallelismConfiguration(5),
)
```

## Data dependency between steps
<a name="build-and-manage-data-dependency"></a>

You define the structure of your DAG by specifying the data relationships between steps. To create data dependencies between steps, pass the properties of one step as the input to another step in the pipeline. The step receiving the input isn't started until after the step providing the input finishes running.

A data dependency uses JsonPath notation in the following format. This format traverses the JSON property file. This means you can append as many *<property>* instances as needed to reach the desired nested property in the file. For more information on JsonPath notation, see the [JsonPath repo](https://github.com/json-path/JsonPath).

```
<step_name>.properties.<property>.<property>
```

The following shows how to specify an Amazon S3 bucket using the `ProcessingOutputConfig` property of a processing step.

```
step_process.properties.ProcessingOutputConfig.Outputs["train_data"].S3Output.S3Uri
```

To create the data dependency, pass the bucket to a training step as follows.

```
from sagemaker.workflow.pipeline_context import PipelineSession

sklearn_train = SKLearn(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="CensusTrain",
    step_args=sklearn_train.fit(inputs=TrainingInput(
        s3_data=step_process.properties.ProcessingOutputConfig.Outputs[
            "train_data"].S3Output.S3Uri
    ))
)
```

To check which properties are referrable for each step type during data dependency creation, see *[Data Dependency - Property Reference](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#data-dependency-property-reference)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

## Custom dependency between steps
<a name="build-and-manage-custom-dependency"></a>

When you specify a data dependency, Pipelines provides the data connection between the steps. Alternatively, one step can access the data from a previous step without directly using Pipelines. In this case, you can create a custom dependency that tells Pipelines not to start a step until after another step has finished running. You create a custom dependency by specifying a step's `DependsOn` attribute.

As an example, the following defines a step `C` that starts only after both step `A` and step `B` finish running.

```
{
  'Steps': [
    {'Name':'A', ...},
    {'Name':'B', ...},
    {'Name':'C', 'DependsOn': ['A', 'B']}
  ]
}
```

Pipelines throws a validation exception if the dependency would create a cyclic dependency.

The following example creates a training step that starts after a processing step finishes running.

```
processing_step = ProcessingStep(...)
training_step = TrainingStep(...)

training_step.add_depends_on([processing_step])
```

The following example creates a training step that doesn't start until two different processing steps finish running.

```
processing_step_1 = ProcessingStep(...)
processing_step_2 = ProcessingStep(...)

training_step = TrainingStep(...)

training_step.add_depends_on([processing_step_1, processing_step_2])
```

The following provides an alternate way to create the custom dependency.

```
training_step.add_depends_on([processing_step_1])
training_step.add_depends_on([processing_step_2])
```

The following example creates a training step that receives input from one processing step and waits for a different processing step to finish running.

```
processing_step_1 = ProcessingStep(...)
processing_step_2 = ProcessingStep(...)

training_step = TrainingStep(
    ...,
    inputs=TrainingInput(
        s3_data=processing_step_1.properties.ProcessingOutputConfig.Outputs[
            "train_data"
        ].S3Output.S3Uri
    )

training_step.add_depends_on([processing_step_2])
```

The following example shows how to retrieve a string list of the custom dependencies of a step.

```
custom_dependencies = training_step.depends_on
```

## Custom images in a step
<a name="build-and-manage-images"></a>

 You can use any of the available SageMaker AI [Deep Learning Container images](https://github.com/aws/deep-learning-containers/blob/master/available_images.md) when you create a step in your pipeline. 

You can also use your own container with pipeline steps. Because you can’t create an image from within Studio Classic, you must create your image using another method before using it with Pipelines.

To use your own container when creating the steps for your pipeline, include the image URI in the estimator definition. For more information on using your own container with SageMaker AI, see [Using Docker Containers with SageMaker AI](https://docs.aws.amazon.com/sagemaker/latest/dg/docker-containers.html).

# Lift-and-shift Python code with the @step decorator
<a name="pipelines-step-decorator"></a>

The `@step` decorator is a feature that converts your local machine learning (ML) code into one or more pipeline steps. You can write your ML function as you would for any ML project. Once tested locally or as a training job using the `@remote` decorator, you can convert the function to a SageMaker AI pipeline step by adding a `@step` decorator. You can then pass the output of the `@step`-decorated function call as a step to Pipelines to create and run a pipeline. You can chain a series of functions with the `@step` decorator to create a multi-step directed acyclic graph (DAG) pipeline as well.

The setup to use the `@step` decorator is the same as the setup to use the `@remote` decorator. You can refer to the remote function documentation for details about how to [setup the environment](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator.html#train-remote-decorator-env) and [use a configuration file](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html) to set defaults. For more information about the `@step` decorator, see [sagemaker.workflow.function\$1step.step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.step).

To view to sample notebooks that demonstrate the use of `@step` decorator, see [@step decorator sample notebooks](https://github.com/aws/amazon-sagemaker-examples/tree/main/sagemaker-pipelines/step-decorator).

The following sections explain how you can annotate your local ML code with a `@step` decorator to create a step, create and run a pipeline using the step, and customize the experience for your use case.

**Topics**
+ [Create a pipeline with `@step`-decorated functions](pipelines-step-decorator-create-pipeline.md)
+ [Run a pipeline](pipelines-step-decorator-run-pipeline.md)
+ [Configure your pipeline](pipelines-step-decorator-cfg-pipeline.md)
+ [Best Practices](pipelines-step-decorator-best.md)
+ [Limitations](pipelines-step-decorator-limit.md)

# Create a pipeline with `@step`-decorated functions
<a name="pipelines-step-decorator-create-pipeline"></a>

You can create a pipeline by converting Python functions into pipeline steps using the `@step` decorator, creating dependencies between those functions to create a pipeline graph (or directed acyclic graph (DAG)), and passing the leaf nodes of that graph as a list of steps to the pipeline. The following sections explain this procedure in detail with examples.

**Topics**
+ [Convert a function to a step](#pipelines-step-decorator-run-pipeline-convert)
+ [Create dependencies between the steps](#pipelines-step-decorator-run-pipeline-link)
+ [Use `ConditionStep` with `@step`-decorated steps](#pipelines-step-decorator-condition)
+ [Define a pipeline using the `DelayedReturn` output of steps](#pipelines-step-define-delayed)
+ [Create a pipeline](#pipelines-step-decorator-pipeline-create)

## Convert a function to a step
<a name="pipelines-step-decorator-run-pipeline-convert"></a>

To create a step using the `@step` decorator, annotate the function with `@step`. The following example shows a `@step`-decorated function that preprocesses the data.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
    
step_process_result = preprocess(raw_data)
```

When you invoke a `@step`-decorated function, SageMaker AI returns a `DelayedReturn` instance instead of running the function. A `DelayedReturn` instance is a proxy for the actual return of that function. The `DelayedReturn` instance can be passed to another function as an argument or directly to a pipeline instance as a step. For information about the `DelayedReturn` class, see [sagemaker.workflow.function\$1step.DelayedReturn](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.function_step.DelayedReturn).

## Create dependencies between the steps
<a name="pipelines-step-decorator-run-pipeline-link"></a>

When you create a dependency between two steps, you create a connection between the steps in your pipeline graph. The following sections introduce multiple ways you can create a dependency between your pipeline steps.

### Data dependencies through input arguments
<a name="pipelines-step-decorator-run-pipeline-link-interstep"></a>

Passing in the `DelayedReturn` output of one function as an input to another function automatically creates a data dependency in the pipeline DAG. In the following example, passing in the `DelayedReturn` output of the `preprocess` function to the `train` function creates a dependency between `preprocess` and `train`.

```
from sagemaker.workflow.function_step import step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe

@step
def train(training_data):
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train(step_process_result)
```

The previous example defines a training function which is decorated with `@step`. When this function is invoked, it receives the `DelayedReturn` output of the preprocessing pipeline step as input. Invoking the training function returns another `DelayedReturn` instance. This instance holds the information about all the previous steps defined in that function (i.e, the `preprocess` step in this example) which form the pipeline DAG.

In the previous example, the `preprocess` function returns a single value. For more complex return types like lists or tuples, refer to [Limitations](pipelines-step-decorator-limit.md).

### Define custom dependencies
<a name="pipelines-step-decorator-run-pipeline-link-custom"></a>

In the previous example, the `train` function received the `DelayedReturn` output of `preprocess` and created a dependency. If you want to define the dependency explicitly without passing the previous step output, use the `add_depends_on` function with the step. You can use the `get_step()` function to retrieve the underlying step from its `DelayedReturn` instance, and then call `add_depends_on`\$1on with the dependency as input. To view the `get_step()` function definition, see [sagemaker.workflow.step\$1outputs.get\$1step](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.step_outputs.get_step). The following example shows you how to create a dependency between `preprocess` and `train` using `get_step()` and `add_depends_on()`.

```
from sagemaker.workflow.step_outputs import get_step

@step
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    processed_data = ..
    return s3.upload(processed_data)

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model

step_process_result = preprocess(raw_data)    
step_train_result = train()

get_step(step_train_result).add_depends_on([step_process_result])
```

### Pass data to and from a `@step`-decorated function to a traditional pipeline step
<a name="pipelines-step-decorator-run-pipeline-link-pass"></a>

You can create a pipeline that includes a `@step`-decorated step and a traditional pipeline step and passes data between them. For example, you can use `ProcessingStep` to process the data and pass its result to the `@step`-decorated training function. In the following example, a `@step`-decorated training step references the output of a processing step.

```
# Define processing step

from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.workflow.steps import ProcessingStep

sklearn_processor = SKLearnProcessor(
    framework_version='1.2-1',
    role='arn:aws:iam::123456789012:role/SagemakerExecutionRole',
    instance_type='ml.m5.large',
    instance_count='1',
)

inputs = [
    ProcessingInput(source=input_data, destination="/opt/ml/processing/input"),
]
outputs = [
    ProcessingOutput(output_name="train", source="/opt/ml/processing/train"),
    ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation"),
    ProcessingOutput(output_name="test", source="/opt/ml/processing/test")
]

process_step = ProcessingStep(
    name="MyProcessStep",
    step_args=sklearn_processor.run(inputs=inputs, outputs=outputs,code='preprocessing.py'),
)
```

```
# Define a @step-decorated train step which references the 
# output of a processing step

@step
def train(train_data_path, test_data_path):
    ...
    return trained_model
    
step_train_result = train(
   process_step.properties.ProcessingOutputConfig.Outputs["train"].S3Output.S3Uri,
   process_step.properties.ProcessingOutputConfig.Outputs["test"].S3Output.S3Uri,
)
```

## Use `ConditionStep` with `@step`-decorated steps
<a name="pipelines-step-decorator-condition"></a>

Pipelines supports a `ConditionStep` class which evaluates the results of preceding steps to decide what action to take in the pipeline. You can use `ConditionStep` with a `@step`-decorated step as well. To use the output of any `@step`-decorated step with `ConditionStep`, enter the output of that step as an argument to `ConditionStep`. In the following example, the condition step receives the output of the `@step`-decorated model evaluation step.

```
# Define steps

@step(name="evaluate")
def evaluate_model():
    # code to evaluate the model
    return {
        "rmse":rmse_value
    }
    
@step(name="register")
def register_model():
    # code to register the model
    ...
```

```
# Define ConditionStep

from sagemaker.workflow.condition_step import ConditionStep
from sagemaker.workflow.conditions import ConditionGreaterThanOrEqualTo
from sagemaker.workflow.fail_step import FailStep

conditionally_register = ConditionStep(
    name="conditional_register",
    conditions=[
        ConditionGreaterThanOrEqualTo(
            # Output of the evaluate step must be json serializable
            left=evaluate_model()["rmse"],  # 
            right=5,
        )
    ],
    if_steps=[FailStep(name="Fail", error_message="Model performance is not good enough")],
    else_steps=[register_model()],
)
```

## Define a pipeline using the `DelayedReturn` output of steps
<a name="pipelines-step-define-delayed"></a>

You define a pipeline the same way whether or not you use a `@step` decorator. When you pass a `DelayedReturn` instance to your pipeline, you don't need to pass a full list of steps to build the pipeline. The SDK automatically infers the previous steps based on the dependencies you define. All the previous steps of the `Step` objects you passed to the pipeline or `DelayedReturn` objects are included in the pipeline graph. In the following example, the pipeline receives the `DelayedReturn` object for the `train` function. SageMaker AI adds the `preprocess` step, as a previous step of `train`, to the pipeline graph.

```
from sagemaker.workflow.pipeline import Pipeline

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=<sagemaker-session>,
)
```

If there are no data or custom dependencies between the steps and you run multiple steps in parallel, the pipeline graph has more than one leaf node. Pass all of these leaf nodes in a list to the `steps` argument in your pipeline definition, as shown in the following example:

```
@step
def process1():
    ...
    return data
    
@step
def process2():
   ...
   return data
   
step_process1_result = process1()
step_process2_result = process2()

pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_process1_result, step_process2_result],
    sagemaker_session=sagemaker-session,
)
```

When the pipeline runs, both steps run in parallel.

You only pass the leaf nodes of the graph to the pipeline because the leaf nodes contain information about all the previous steps defined through data or custom dependencies. When it compiles the pipeline, SageMaker AI also infers of all of the subsequent steps that form the pipeline graph and adds each of them as a separate step to the pipeline.

## Create a pipeline
<a name="pipelines-step-decorator-pipeline-create"></a>

Create a pipeline by calling `pipeline.create()`, as shown in the following snippet. For details about `create()`, see [sagemaker.workflow.pipeline.Pipeline.create](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.create).

```
role = "pipeline-role"
pipeline.create(role)
```

When you call `pipeline.create()`, SageMaker AI compiles all of the steps defined as part of the pipeline instance. SageMaker AI uploads the serialized function, arguments, and all the other step-related artifacts to Amazon S3.

Data resides in the S3 bucket according to the following structure:

```
s3_root_uri/
    pipeline_name/
        sm_rf_user_ws/
            workspace.zip  # archive of the current working directory (workdir)
        step_name/
            timestamp/
                arguments/                # serialized function arguments
                function/                 # serialized function
                pre_train_dependencies/   # any dependencies and pre_execution scripts provided for the step       
        execution_id/
            step_name/
                results     # returned output from the serialized function including the model
```

`s3_root_uri` is defined in the SageMaker AI config file and applies to the entire pipeline. If undefined, the default SageMaker AI bucket is used.

**Note**  
Every time SageMaker AI compiles a pipeline, SageMaker AI saves the the steps' serialized functions, arguments and dependencies in a folder timestamped with the current time. This occurs every time you run `pipeline.create()`, `pipeline.update()`, `pipeline.upsert()` or `pipeline.definition()`.

# Run a pipeline
<a name="pipelines-step-decorator-run-pipeline"></a>

The following page describes how to run a pipeline with Amazon SageMaker Pipelines, either with SageMaker AI resources or locally.

Start a new pipeline run with the `pipeline.start()` function as you would for a traditional SageMaker AI pipeline run. For information about the `start()` function, see [sagemaker.workflow.pipeline.Pipeline.start](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline.Pipeline.start).

**Note**  
A step defined using the `@step` decorator runs as a training job. Therefore, be aware of the following limits:  
Instance limits and training job limits in your accounts. Update your limits accordingly to avoid any throttling or resource limit issues.
The monetary costs associated with every run of a training step in the pipeline. For more details, refer to [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/).

## Retrieve results from a pipeline run locally
<a name="pipelines-step-decorator-run-pipeline-retrieve"></a>

To view the result of any step of a pipeline run, use [execution.result()](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#sagemaker.workflow.pipeline._PipelineExecution.result           ), as shown in the following snippet:

```
execution = pipeline.start()
execution.result(step_name="train")
```

**Note**  
Pipelines does not support `execution.result()` in local mode.

You can only retrieve results for one step at a time. If the step name was generated by SageMaker AI, you can retrieve the step name by calling `list_steps` as follows:

```
execution.list_step()
```

## Run a pipeline locally
<a name="pipelines-step-decorator-run-pipeline-local"></a>

You can run a pipeline with `@step`-decorated steps locally as you would for traditional pipeline steps. For details about local mode pipeline runs, see [Run pipelines using local mode](pipelines-local-mode.md). To use local mode, provide a `LocalPipelineSession` instead of a `SageMakerSession` to your pipeline definition, as shown in the following example:

```
from sagemaker.workflow.function_step import step
from sagemaker.workflow.pipeline import Pipeline
from sagemaker.workflow.pipeline_context import LocalPipelineSession

@step
def train():
    training_data = s3.download(....)
    ...
    return trained_model
    
step_train_result = train()

local_pipeline_session = LocalPipelineSession()

local_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[step_train_result],
    sagemaker_session=local_pipeline_session # needed for local mode
)

local_pipeline.create(role_arn="role_arn")

# pipeline runs locally
execution = local_pipeline.start()
```

# Configure your pipeline
<a name="pipelines-step-decorator-cfg-pipeline"></a>

You are advised to use the SageMaker AI config file to set the defaults for the pipeline. For information about the SageMaker AI configuration file, see [Configuring and using defaults with the SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/overview.html#configuring-and-using-defaults-with-the-sagemaker-python-sdk). Any configuration added to the config file applies to all steps in the pipeline. If you want to override options for any of the steps, provide new values in the `@step` decorator arguments. The following topic describes how to set up a config file.

The `@step` decorator's configuration in the config file is identical to the `@remote` decorator's configuration. To set up the pipeline role ARN and pipeline tags in the config file, use the `Pipeline` section shown in the following snippet:

```
SchemaVersion: '1.0'
SageMaker:
  Pipeline:
    RoleArn: 'arn:aws:iam::555555555555:role/IMRole'
    Tags:
    - Key: 'tag_key'
      Value: 'tag_value'
```

For most of the defaults you can set in the configuration file you can also override by passing new values to the `@step` decorator. For example, you can override the instance type set in the config file for your preprocessing step, as shown in the following example:

```
@step(instance_type="ml.m5.large")
def preprocess(raw_data):
    df = pandas.read_csv(raw_data)
    ...
    return procesed_dataframe
```

A few arguments are not part of the `@step` decorator parameters list—these can be configured for the entire pipeline only through the SageMaker AI configuration file. They are listed as follows:
+ `sagemaker_session` (`sagemaker.session.Session`): The underlying SageMaker AI session to which SageMaker AI delegates service calls. If unspecified, a session is created using a default configuration as follows:

  ```
  SageMaker:
    PythonSDK:
      Modules:
        Session:
          DefaultS3Bucket: 'default_s3_bucket'
          DefaultS3ObjectKeyPrefix: 'key_prefix'
  ```
+ `custom_file_filter` (`CustomFileFilter)`: A `CustomFileFilter` object that specifies the local directories and files to include in the pipeline step. If unspecified, this value defaults to `None`. For `custom_file_filter` to take effect, you must set `IncludeLocalWorkdir` to `True`. The following example shows a configuration that ignores all notebook files, and files and directories named `data`.

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          IncludeLocalWorkDir: true
          CustomFileFilter: 
            IgnoreNamePatterns: # files or directories to ignore
            - "*.ipynb" # all notebook files
            - "data" # folder or file named "data"
  ```

  For more details about how to use `IncludeLocalWorkdir` with `CustomFileFilter`, see [Using modular code with the @remote decorator](train-remote-decorator-modular.md).
+ `s3_root_uri (str)`: The root Amazon S3 folder to which SageMaker AI uploads the code archives and data. If unspecified, the default SageMaker AI bucket is used.
+ `s3_kms_key (str)`: The key used to encrypt the input and output data. You can only configure this argument in the SageMaker AI config file and the argument applies to all steps defined in the pipeline. If unspecified, the value defaults to `None`. See the following snippet for an example S3 KMS key configuration:

  ```
  SchemaVersion: '1.0'
  SageMaker:
    PythonSDK:
      Modules:
        RemoteFunction:
          S3KmsKeyId: 's3kmskeyid'
          S3RootUri: 's3://amzn-s3-demo-bucket/my-project
  ```

# Best Practices
<a name="pipelines-step-decorator-best"></a>

The following sections suggest best practices to follow when you use the `@step` decorator for your pipeline steps.

## Use warm pools
<a name="pipelines-step-decorator-best-warmpool"></a>

For faster pipeline step runs, use the warm pooling functionality provided for training jobs. You can turn on the warm pool functionality by providing the `keep_alive_period_in_seconds` argument to the `@step` decorator as demonstrated in the following snippet:

```
@step(
   keep_alive_period_in_seconds=900
)
```

For more information about warm pools, see [SageMaker AI Managed Warm Pools](train-warm-pools.md). 

## Structure your directory
<a name="pipelines-step-decorator-best-dir"></a>

You are advised to use code modules while using the `@step` decorator. Put the `pipeline.py` module, in which you invoke the step functions and define the pipeline, at the root of the workspace. The recommended structure is shown as follows:

```
.
├── config.yaml # the configuration file that define the infra settings
├── requirements.txt # dependencies
├── pipeline.py  # invoke @step-decorated functions and define the pipeline here
├── steps/
| ├── processing.py
| ├── train.py
├── data/
├── test/
```

# Limitations
<a name="pipelines-step-decorator-limit"></a>

The following sections outline the limitations that you should be aware of when you use the `@step` decorator for your pipeline steps.

## Function argument limitations
<a name="pipelines-step-decorator-arg"></a>

When you pass an input argument to the `@step`-decorated function, the following limitations apply:
+ You can pass the `DelayedReturn`, `Properties` (of steps of other types), `Parameter`, and `ExecutionVariable` objects to `@step`-decorated functions as arguments. But `@step`-decorated functions do not support `JsonGet` and `Join` objects as arguments.
+ You cannot directly access a pipeline variable from a `@step` function. The following example produces an error:

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func():
      print(param)
  
  func() # this raises a SerializationError
  ```
+ You cannot nest a pipeline variable in another object and pass it to a `@step` function. The following example produces an error:

  ```
  param = ParameterInteger(name="<parameter-name>", default_value=10)
  
  @step
  def func(arg):
      print(arg)
  
  func(arg=(param,)) # this raises a SerializationError because param is nested in a tuple
  ```
+ Since inputs and outputs of a function are serialized, there are restrictions on the type of data that can be passed as input or output from a function. See the *Data serialization and deserialization* section of [Invoke a remote function](train-remote-decorator-invocation.md) for more details. The same restrictions apply to `@step`-decorated functions.
+ Any object that has a boto client cannot be serialized, hence you cannot pass such objects as input to or output from a `@step`-decorated function. For example, SageMaker Python SDK client classes such as `Estimator`, `Predictor`, and `Processor` can't be serialized.

## Function imports
<a name="pipelines-step-decorator-best-import"></a>

You should import the libraries required by the step inside rather than outside the function. If you import them at global scope, you risk an import collision while serializing the function. For example, `sklearn.pipeline.Pipeline` could be overridden by `sagemaker.workflow.pipeline.Pipeline`.

## Referencing child members of function return value
<a name="pipelines-step-decorator-best-child"></a>

If you reference child members of a `@step`-decorated function's return value, the following limitations apply:
+ You can reference the child members with `[]` if the `DelayedReturn` object represents a tuple, list or dict, as shown in the following example:

  ```
  delayed_return[0]
  delayed_return["a_key"]
  delayed_return[1]["a_key"]
  ```
+ You cannot unpack a tuple or list output because the exact length of the underlying tuple or list can't be known when you invoke the function. The following example produces an error:

  ```
  a, b, c = func() # this raises ValueError
  ```
+ You cannot iterate over a `DelayedReturn` object. The following example raises an error:

  ```
  for item in func(): # this raises a NotImplementedError
  ```
+ You cannot reference arbitrary child members with '`.`'. The following example produces an error:

  ```
  delayed_return.a_child # raises AttributeError
  ```

## Existing pipeline features that are not supported
<a name="pipelines-step-decorator-best-unsupported"></a>

You cannot use the `@step` decorator with the following pipeline features:
+ [Pipeline step caching](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-caching.html)
+ [Property files](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-propertyfile.html#build-and-manage-propertyfile-property)

# Pass Data Between Steps
<a name="build-and-manage-propertyfile"></a>

When building pipelines with Amazon SageMaker Pipelines, you might need to pass data from one step to the next. For example, you might want to use the model artifacts generated by a training step as input to a model evaluation or deployment step. You can use this functionality to create interdependent pipeline steps and build your ML workflows.

When you need to retrieve information from the output of a pipeline step, you can use `JsonGet`. `JsonGet` helps you extract information from Amazon S3 or property files. The following sections explain methods you can use to extract step outputs with `JsonGet`.

## Pass data between steps with Amazon S3
<a name="build-and-manage-propertyfile-s3"></a>

You can use `JsonGet` in a `ConditionStep` to fetch the JSON output directly from Amazon S3. The Amazon S3 URI can be a `Std:Join` function containing primitive strings, pipeline run variables, or pipeline parameters. The following example shows how you can use `JsonGet` in a `ConditionStep`:

```
# Example json file in s3 bucket generated by a processing_step
{
   "Output": [5, 10]
}

cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name="<step-name>",
        s3_uri="<s3-path-to-json>",
        json_path="Output[1]"
    ),
    right=6.0
)
```

If you are using `JsonGet` with an Amazon S3 path in the condition step, you must explicitly add a dependency between the condition step and the step generating the JSON output. In following example, the condition step is created with a dependency on the processing step:

```
cond_step = ConditionStep(
        name="<step-name>",
        conditions=[cond_lte],
        if_steps=[fail_step],
        else_steps=[register_model_step],
        depends_on=[processing_step],
)
```

## Pass data between steps with property files
<a name="build-and-manage-propertyfile-property"></a>

Use property files to store information from the output of a processing step. This is particularly useful when analyzing the results of a processing step to decide how a conditional step should be executed. The `JsonGet` function processes a property file and enables you to use JsonPath notation to query the property JSON file. For more information on JsonPath notation, see the [JsonPath repo](https://github.com/json-path/JsonPath).

To store a property file for later use, you must first create a `PropertyFile` instance with the following format. The `path` parameter is the name of the JSON file to which the property file is saved. Any `output_name` must match the `output_name` of the `ProcessingOutput` that you define in your processing step. This enables the property file to capture the `ProcessingOutput` in the step.

```
from sagemaker.workflow.properties import PropertyFile

<property_file_instance> = PropertyFile(
    name="<property_file_name>",
    output_name="<processingoutput_output_name>",
    path="<path_to_json_file>"
)
```

When you create your `ProcessingStep` instance, add the `property_files` parameter to list all of the parameter files that the Amazon SageMaker Pipelines service must index. This saves the property file for later use.

```
property_files=[<property_file_instance>]
```

To use your property file in a condition step, add the `property_file` to the condition that you pass to your condition step as shown in the following example to query the JSON file for your desired property using the `json_path` parameter.

```
cond_lte = ConditionLessThanOrEqualTo(
    left=JsonGet(
        step_name=step_eval.name,
        property_file=<property_file_instance>,
        json_path="mse"
    ),
    right=6.0
)
```

For more in-depth examples, see *[Property File](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#property-file)* in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

# Caching pipeline steps
<a name="pipelines-caching"></a>

In Amazon SageMaker Pipelines, you can use step caching to save time and resources when rerunning pipelines. Step caching reuses the output of a previous successful run of a step (instead of recomputing it) when the step has the same configuration and inputs. This helps you achieve consistent results across pipeline reruns with identical parameters. The following topic shows you how to configure and turn on step caching for your pipelines.

When you use step signature caching, Pipelines tries to find a previous run of your current pipeline step with the same values for certain attributes. If found, Pipelines propagates the outputs from the previous run rather than recomputing the step. The attributes checked are specific to the step type, and are listed in [Default cache key attributes by pipeline step type](pipelines-default-keys.md).

You must opt in to step caching — it is off by default. When you turn on step caching, you must also define a timeout. This timeout defines how old a previous run can be to remain a candidate for reuse.

Step caching only considers successful runs — it never reuses failed runs. When multiple successful runs exist within the timeout period, Pipelines uses the result for the most recent successful run. If no successful runs match in the timeout period, Pipelines reruns the step. If the executor finds a previous run that meets the criteria but is still in progress, both steps continue running and update the cache if they're successful.

Step caching is only scoped for individual pipelines, so you can’t reuse a step from another pipeline even if there is a step signature match.

Step caching is available for the following step types: 
+ [Processing](build-and-manage-steps-types.md#step-type-processing)
+ [Training](build-and-manage-steps-types.md#step-type-training)
+ [Tuning](build-and-manage-steps-types.md#step-type-tuning)
+ [AutoML](build-and-manage-steps-types.md#step-type-automl)
+ [Transform](build-and-manage-steps-types.md#step-type-transform)
+ [`ClarifyCheck`](build-and-manage-steps-types.md#step-type-clarify-check)
+ [`QualityCheck`](build-and-manage-steps-types.md#step-type-quality-check)
+ [EMR](build-and-manage-steps-types.md#step-type-emr)

**Topics**
+ [Turn on step caching](pipelines-caching-enabling.md)
+ [Turn off step caching](pipelines-caching-disabling.md)
+ [Default cache key attributes by pipeline step type](pipelines-default-keys.md)
+ [Cached data access control](pipelines-access-control.md)

# Turn on step caching
<a name="pipelines-caching-enabling"></a>

To turn on step caching, you must add a `CacheConfig` property to the step definition. `CacheConfig` properties use the following format in the pipeline definition file:

```
{
    "CacheConfig": {
        "Enabled": false,
        "ExpireAfter": "<time>"
    }
}
```

The `Enabled` field indicates whether caching is turned on for the particular step. You can set the field to `true`, which tells SageMaker AI to try to find a previous run of the step with the same attributes. Or, you can set the field to `false`, which tells SageMaker AI to run the step every time the pipeline runs. `ExpireAfter` is a string in [ISO 8601 duration](https://en.wikipedia.org/wiki/ISO_8601#Durations) format that defines the timeout period. The `ExpireAfter` duration can be a year, month, week, day, hour, or minute value. Each value consists of a number followed by a letter indicating the unit of duration. For example:
+ "30d" = 30 days
+ "5y" = 5 years
+ "T16m" = 16 minutes
+ "30dT5h" = 30 days and 5 hours.

The following discussion describes the procedure to turn on caching for new or pre-existing pipelines using the Amazon SageMaker Python SDK.

**Turn on caching for new pipelines**

For new pipelines, initialize a `CacheConfig` instance with `enable_caching=True` and provide it as an input to your pipeline step. The following example turns on caching with a 1-hour timeout period for a training step: 

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
      
cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)
```

**Turn on caching for pre-existing pipelines**

To turn on caching for pre-existing, already-defined pipelines, turn on the `enable_caching` property for the step, and set `expire_after` to a timeout value. Lastly, update the pipeline with `pipeline.upsert()` or `pipeline.update()`. Once you run it again, the following code example turns on caching with a 1-hour timeout period for a training step:

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline import Pipeline

cache_config = CacheConfig(enable_caching=True, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)

# define pipeline
pipeline = Pipeline(
    steps=[step_train]
)

# additional step for existing pipelines
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

Alternatively, update the cache config after you have already defined the (pre-existing) pipeline, allowing one continuous code run. The following code sample demonstrates this method:

```
# turn on caching with timeout period of one hour
pipeline.steps[0].cache_config.enable_caching = True 
pipeline.steps[0].cache_config.expire_after = "PT1H" 

# additional step for existing pipelines
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

For more detailed code examples and a discussion about how Python SDK parameters affect caching, see [ Caching Configuration](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration) in the Amazon SageMaker Python SDK documentation.

# Turn off step caching
<a name="pipelines-caching-disabling"></a>

A pipeline step does not rerun if you change any attributes that are not listed in [Default cache key attributes by pipeline step type](pipelines-default-keys.md) for its step type. However, you may decide that you want the pipeline step to rerun anyway. In this case, you need to turn off step caching.

To turn off step caching, set the `Enabled` attribute in the step definition’s `CacheConfig` property in the step definition to `false`, as shown in the following code snippet:

```
{
    "CacheConfig": {
        "Enabled": false,
        "ExpireAfter": "<time>"
    }
}
```

Note that the `ExpireAfter` attribute is ignored when `Enabled` is `false`.

To turn off caching for a pipeline step using the Amazon SageMaker Python SDK, define the pipeline of your pipeline step, turn off the `enable_caching` property, and update the pipeline.

Once you run it again, the following code example turns off caching for a training step:

```
from sagemaker.workflow.pipeline_context import PipelineSession
from sagemaker.workflow.steps import CacheConfig
from sagemaker.workflow.pipeline import Pipeline

cache_config = CacheConfig(enable_caching=False, expire_after="PT1H")
estimator = Estimator(..., sagemaker_session=PipelineSession())

step_train = TrainingStep(
    name="TrainAbaloneModel",
    step_args=estimator.fit(inputs=inputs),
    cache_config=cache_config
)

# define pipeline
pipeline = Pipeline(
    steps=[step_train]
)

# update the pipeline
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

Alternatively, turn off the `enable_caching` property after you have already defined the pipeline, allowing one continuous code run. The following code sample demonstrates this solution:

```
# turn off caching for the training step
pipeline.steps[0].cache_config.enable_caching = False

# update the pipeline
pipeline.update()
# or, call upsert() to update the pipeline
# pipeline.upsert()
```

For more detailed code examples and a discussion about how Python SDK parameters affect caching, see [Caching Configuration](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration) in the Amazon SageMaker Python SDK documentation.

# Default cache key attributes by pipeline step type
<a name="pipelines-default-keys"></a>

When deciding whether to reuse a previous pipeline step or rerun the step, Pipelines checks to see if certain attributes have changed. If the set of attributes is different from all previous runs within the timeout period, the step runs again. These attributes include input artifacts, app or algorithm specification, and environment variables. The following list shows each pipeline step type and the attributes that, if changed, initiate a rerun of the step. For more information about which Python SDK parameters are used to create the following attributes, see [ Caching Configuration](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#caching-configuration) in the Amazon SageMaker Python SDK documentation.

## [Processing step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateProcessingJob.html)
<a name="collapsible-caching-section-1"></a>
+ AppSpecification
+ Environment
+ ProcessingInputs. This attribute contains information about the preprocessing script.

  

## [Training step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTrainingJob.html)
<a name="collapsible-caching-section-2"></a>
+ AlgorithmSpecification
+ CheckpointConfig
+ DebugHookConfig
+ DebugRuleConfigurations
+ Environment
+ HyperParameters
+ InputDataConfig. This attribute contains information about the training script.

  

## [Tuning step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateHyperParameterTuningJob.html)
<a name="collapsible-caching-section-3"></a>
+ HyperParameterTuningJobConfig
+ TrainingJobDefinition. This attribute is composed of multiple child attributes, not all of which cause the step to rerun. The child attributes that could incur a rerun (if changed) are:
  + AlgorithmSpecification
  + HyperParameterRanges
  + InputDataConfig
  + StaticHyperParameters
  + TuningObjective
+ TrainingJobDefinitions

  

## [AutoML step](https://docs.aws.amazon.com//sagemaker/latest/APIReference/API_AutoMLJobConfig.html)
<a name="collapsible-caching-section-4"></a>
+ AutoMLJobConfig. This attribute is composed of multiple child attributes, not all of which cause the step to rerun. The child attributes that could incur a rerun (if changed) are:
  + CompletionCriteria
  + CandidateGenerationConfig
  + DataSplitConfig
  + Mode
+ AutoMLJobObjective
+ InputDataConfig
+ ProblemType

  

## [Transform step](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateTransformJob.html)
<a name="collapsible-caching-section-5"></a>
+ DataProcessing
+ Environment
+ ModelName
+ TransformInput

  

## [ClarifyCheck step](build-and-manage-steps-types.md#step-type-clarify-check)
<a name="collapsible-caching-section-6"></a>
+ ClarifyCheckConfig
+ CheckJobConfig
+ SkipCheck
+ RegisterNewBaseline
+ ModelPackageGroupName
+ SuppliedBaselineConstraints

  

## [QualityCheck step](build-and-manage-steps-types.md#step-type-quality-check)
<a name="collapsible-caching-section-7"></a>
+ QualityCheckConfig
+ CheckJobConfig
+ SkipCheck
+ RegisterNewBaseline
+ ModelPackageGroupName
+ SuppliedBaselineConstraints
+ SuppliedBaselineStatistics

  

## [EMR Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-emr)
<a name="collapsible-caching-section-8"></a>
+ ClusterId
+ StepConfig

  

# Cached data access control
<a name="pipelines-access-control"></a>

When a SageMaker AI pipeline runs, it caches the parameters and metadata associated with the SageMaker AI jobs launched by the pipeline and saves them for reuse in subsequent runs. This metadata is accessible through a variety of sources in addition to cached pipeline steps, and includes the following types:
+ `Describe*Job` requests
+ CloudWatch Logs
+ CloudWatch Events
+ CloudWatch Metrics
+ SageMaker AI Search

Note that access to each data source in the list is controlled by its own set of IAM permissions. Removing a particular role’s access to one data source does not affect the level of access to the others. For example, an account admin might remove IAM permissions for `Describe*Job` requests from a caller’s role. While the caller can no longer make `Describe*Job` requests, they can still retrieve the metadata from a pipeline run with cached steps as long as they have permission to run the pipeline. If an account admin wants to remove access to the metadata from a particular SageMaker AI job completely, they need to remove permissions for each of the relevant services that provide access to the data. 

# Retry Policy for Pipeline Steps
<a name="pipelines-retry-policy"></a>

Retry policies help you automatically retry your Pipelines steps after an error occurs. Any pipeline step can encounter exceptions, and exceptions happen for various reasons. In some cases, a retry can resolve these issues. With a retry policy for pipeline steps, you can choose whether to retry a particular pipeline step or not.

The retry policy only supports the following pipeline steps:
+ [Processing step](build-and-manage-steps-types.md#step-type-processing) 
+ [Training step](build-and-manage-steps-types.md#step-type-training) 
+ [Tuning step](build-and-manage-steps-types.md#step-type-tuning) 
+ [AutoML step](build-and-manage-steps-types.md#step-type-automl) 
+ [Create model step](build-and-manage-steps-types.md#step-type-create-model) 
+ [Register model step](build-and-manage-steps-types.md#step-type-register-model) 
+ [Transform step](build-and-manage-steps-types.md#step-type-transform) 
+ [Notebook job step](build-and-manage-steps-types.md#step-type-notebook-job) 

**Note**  
Jobs running inside both the tuning and AutoML steps conduct retries internally and will not retry the `SageMaker.JOB_INTERNAL_ERROR` exception type, even if a retry policy is configured. You can program your own [ Retry Strategy](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_RetryStrategy.html) using the SageMaker API.

## Supported exception types for the retry policy
<a name="pipelines-retry-policy-supported-exceptions"></a>

The retry policy for pipeline steps supports the following exception types:
+ `Step.SERVICE_FAULT`: These exceptions occur when an internal server error or transient error happens when calling downstream services. Pipelines retries on this type of error automatically. With a retry policy, you can override the default retry operation for this exception type.
+ `Step.THROTTLING`: Throttling exceptions can occur while calling the downstream services. Pipelines retries on this type of error automatically. With a retry policy, you can override the default retry operation for this exception type.
+ `SageMaker.JOB_INTERNAL_ERROR`: These exceptions occur when the SageMaker AI job returns `InternalServerError`. In this case, starting a new job may fix a transient issue.
+ `SageMaker.CAPACITY_ERROR`: The SageMaker AI job may encounter Amazon EC2 `InsufficientCapacityErrors`, which leads to the SageMaker AI job’s failure. You can retry by starting a new SageMaker AI job to avoid the issue. 
+ `SageMaker.RESOURCE_LIMIT`: You can exceeed the resource limit quota when running a SageMaker AI job. You can wait and retry running the SageMaker AI job after a short period and see if resources are released.

## The JSON schema for the retry policy
<a name="pipelines-retry-policy-json-schema"></a>

The retry policy for Pipelines has the following JSON schema:

```
"RetryPolicy": {
   "ExceptionType": [String]
   "IntervalSeconds": Integer
   "BackoffRate": Double
   "MaxAttempts": Integer
   "ExpireAfterMin": Integer
}
```
+ `ExceptionType`: This field requires the following exception types in a string array format.
  + `Step.SERVICE_FAULT`
  + `Step.THROTTLING`
  + `SageMaker.JOB_INTERNAL_ERROR`
  + `SageMaker.CAPACITY_ERROR`
  + `SageMaker.RESOURCE_LIMIT`
+ `IntervalSeconds` (optional): The number of seconds before the first retry attempt (1 by default). `IntervalSeconds` has a maximum value of 43200 seconds (12 hours).
+ `BackoffRate` (optional): The multiplier by which the retry interval increases during each attempt (2.0 by default).
+ `MaxAttempts` (optional): A positive integer that represents the maximum number of retry attempts (5 by default). If the error recurs more times than `MaxAttempts` specifies, retries cease and normal error handling resumes. A value of 0 specifies that errors are never retried. `MaxAttempts` has a maximum value of 20.
+ `ExpireAfterMin` (optional): A positive integer that represents the maximum timespan of retry. If the error recurs after `ExpireAfterMin` minutes counting from the step gets executed, retries cease and normal error handling resumes. A value of 0 specifies that errors are never retried. `ExpireAfterMin ` has a maximum value of 14,400 minutes (10 days).
**Note**  
Only one of `MaxAttempts` or `ExpireAfterMin` can be given, but not both; if both are *not* specified, `MaxAttempts` becomes the default. If both properties are identified within one policy, then the retry policy generates a validation error.

# Configuring a retry policy
<a name="pipelines-configuring-retry-policy"></a>

While SageMaker Pipelines provide a robust and automated way to orchestrate machine learning workflows, you might encounter failures when you run them. To handle such scenarios gracefully and improve the reliability of your pipelines, you can configure retry policies that define how and when to automatically retry specific steps after encountering an exception. The retry policy allows you to specify the types of exceptions to retry, the maximum number of retry attempts, the interval between retries, and the backoff rate for increasing the retry intervals. The following section provides examples of how to configure a retry policy for a training step in your pipeline, both in JSON and using the SageMaker Python SDK.

The following is an example of a training step with a retry policy.

```
{
    "Steps": [
        {
            "Name": "MyTrainingStep",
            "Type": "Training",
            "RetryPolicies": [
                {
                    "ExceptionType": [
                        "SageMaker.JOB_INTERNAL_ERROR",
                        "SageMaker.CAPACITY_ERROR"
                    ],
                    "IntervalSeconds": 1,
                    "BackoffRate": 2,
                    "MaxAttempts": 5
                }
            ]
        }
    ]
}
```



The following is an example of how to build a `TrainingStep` in SDK for Python (Boto3) with a retry policy.

```
from sagemaker.workflow.retry import (
    StepRetryPolicy, 
    StepExceptionTypeEnum,
    SageMakerJobExceptionTypeEnum,
    SageMakerJobStepRetryPolicy
)

step_train = TrainingStep(
    name="MyTrainingStep",
    xxx,
    retry_policies=[
        // override the default 
        StepRetryPolicy(
            exception_types=[
                StepExceptionTypeEnum.SERVICE_FAULT, 
                StepExceptionTypeEnum.THROTTLING
            ],
            expire_after_mins=5,
            interval_seconds=10,
            backoff_rate=2.0 
        ),
        // retry when resource limit quota gets exceeded
        SageMakerJobStepRetryPolicy(
            exception_types=[SageMakerJobExceptionTypeEnum.RESOURCE_LIMIT],
            expire_after_mins=120,
            interval_seconds=60,
            backoff_rate=2.0
        ),
        // retry when job failed due to transient error or EC2 ICE.
        SageMakerJobStepRetryPolicy(
            failure_reason_types=[
                SageMakerJobExceptionTypeEnum.INTERNAL_ERROR,
                SageMakerJobExceptionTypeEnum.CAPACITY_ERROR,
            ],
            max_attempts=10,
            interval_seconds=30,
            backoff_rate=2.0
        )
    ]
)
```

For more information on configuring retry behavior for certain step types, see *[Amazon SageMaker Pipelines - Retry Policy](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#retry-policy)* in the Amazon SageMaker Python SDK documentation.

# Selective execution of pipeline steps
<a name="pipelines-selective-ex"></a>

As you use Pipelines to create workflows and orchestrate your ML training steps, you might need to undertake multiple experimentation phases. Instead of running the full pipeline each time, you might only want to repeat certain steps. With Pipelines, you can execute pipeline steps selectively. This helps optimize your ML training. Selective execution is useful in the following scenarios: 
+ You want to restart a specific step with updated instance type, hyperparameters, or other variables while keeping the parameters from upstream steps.
+ Your pipeline fails an intermediate step. Previous steps in the execution, such as data preparation or feature extraction, are expensive to rerun. You might need to introduce a fix and rerun certain steps manually to complete the pipeline. 

Using selective execution, you can choose to run any subset of steps as long as they are connected in the directed acyclic graph (DAG) of your pipeline. The following DAG shows an example pipeline workflow:

![\[A directed acyclic graph (DAG) of an example pipeline.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipeline-full.png)


You can select steps `AbaloneTrain` and `AbaloneEval` in a selective execution, but you cannot select just `AbaloneTrain` and `AbaloneMSECond` steps because these steps are not connected in the DAG. For non-selected steps in the workflow, the selective execution reuses the outputs from a reference pipeline execution rather than rerunning the steps. Also, non-selected steps that are downstream from the selected steps do not run in a selective execution. 

If you choose to run a subset of intermediate steps in your pipeline, your steps may depend on previous steps. SageMaker AI needs a reference pipeline execution from which to resource these dependencies. For example, if you choose to run the steps `AbaloneTrain` and `AbaloneEval`, you need the outputs from the `AbaloneProcess` step. You can either provide a reference execution ARN or direct SageMaker AI to use the latest pipeline execution, which is the default behavior. If you have a reference execution, you can also build the runtime parameters from your reference run and supply them to your selective executive run with overrides. For details, see [Reuse runtime parameter values from a reference execution](#pipelines-selective-ex-reuse).

In detail, you provide a configuration for your selective execution pipeline run using `SelectiveExecutionConfig`. If you include an ARN for a reference pipeline execution (with the `source_pipeline_execution_arn` argument), SageMaker AI uses the previous step dependencies from the pipeline execution you provided. If you do not include an ARN and a latest pipeline execution exists, SageMaker AI uses it as a reference by default. If you do not include an ARN and do not want SageMaker AI to use your latest pipeline execution, set `reference_latest_execution` to `False`. The pipeline execution which SageMaker AI ultimately uses as a reference, whether the latest or user-specified, must be in `Success` or `Failed` state.

The following table summarizes how SageMaker AI chooses a reference execution.


| The `source_pipeline_execution_arn` argument value | The `reference_latest_execution` argument value | The reference execution used | 
| --- | --- | --- | 
| A pipeline ARN | `True` or unspecified | The specified pipeline ARN | 
| A pipeline ARN | `False` | The specified pipeline ARN | 
| null or unspecified | `True` or unspecified | The latest pipeline execution | 
| null or unspecified | `False` | None—in this case, select steps without upstream dependencies | 

For more information about selective execution configuration requirements, see the [ sagemaker.workflow.selective\$1execution\$1config.SelectiveExecutionConfig ](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#selective-execution-config) documentation.

The following discussion includes examples for the cases in which you want to specify a pipeline reference execution, use the latest pipeline execution as a reference, or run selective execution without a reference pipeline execution.

## Selective execution with a user-specified pipeline reference
<a name="pipelines-selective-ex-arn"></a>

The following example demonstrates a selective execution of the steps `AbaloneTrain` and `AbaloneEval` using a reference pipeline execution.

```
from sagemaker.workflow.selective_execution_config import SelectiveExecutionConfig

selective_execution_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef", 
    selected_steps=["AbaloneTrain", "AbaloneEval"]
)

selective_execution = pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## Selective execution with the latest pipeline execution as a reference
<a name="pipelines-selective-ex-latest"></a>

The following example demonstrates a selective execution of the steps `AbaloneTrain` and `AbaloneEval` using the latest pipeline execution as a reference. Since SageMaker AI uses the latest pipeline execution by default, you can optionally set the `reference_latest_execution` argument to `True`.

```
# Prepare a new selective execution. Select only the first step in the pipeline without providing source_pipeline_execution_arn.
selective_execution_config = SelectiveExecutionConfig(
    selected_steps=["AbaloneTrain", "AbaloneEval"],
    # optional
    reference_latest_execution=True
)

# Start pipeline execution without source_pipeline_execution_arn
pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## Selective execution without a reference pipeline
<a name="pipelines-selective-ex-none"></a>

The following example demonstrates a selective execution of the steps `AbaloneProcess` and `AbaloneTrain` without providing a reference ARN and turning off the option to use the latest pipeline run as a reference. SageMaker AI permits this configuration since this subset of steps doesn’t depend on previous steps.

```
# Prepare a new selective execution. Select only the first step in the pipeline without providing source_pipeline_execution_arn.
selective_execution_config = SelectiveExecutionConfig(
    selected_steps=["AbaloneProcess", "AbaloneTrain"],
    reference_latest_execution=False
)

# Start pipeline execution without source_pipeline_execution_arn
pipeline.start(
    execution_display_name=f"Sample-Selective-Execution-1",
    parameters={"MaxDepth":6, "NumRound":60},
    selective_execution_config=selective_execution_config,
)
```

## Reuse runtime parameter values from a reference execution
<a name="pipelines-selective-ex-reuse"></a>

You can build the parameters from your reference pipeline execution using `build_parameters_from_execution`, and supply the result to your selective execution pipeline. You can use the original parameters from the reference execution, or apply any overrides using the `parameter_value_overrides` argument.

The following example shows you how to build parameters from a reference execution and apply an override for the `MseThreshold` parameter.

```
# Prepare a new selective execution.
selective_execution_config = SelectiveExecutionConfig(
    source_pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef",
    selected_steps=["AbaloneTrain", "AbaloneEval", "AbaloneMSECond"],
)
# Define a new parameters list to test.
new_parameters_mse={
    "MseThreshold": 5,
}

# Build parameters from reference execution and override with new parameters to test.
new_parameters = pipeline.build_parameters_from_execution(
    pipeline_execution_arn="arn:aws:sagemaker:us-west-2:123123123123:pipeline/abalone/execution/123ab12cd3ef",
    parameter_value_overrides=new_parameters_mse
)

# Start pipeline execution with new parameters.
execution = pipeline.start(
    selective_execution_config=selective_execution_config,
    parameters=new_parameters
)
```

# Baseline calculation, drift detection and lifecycle with ClarifyCheck and QualityCheck steps in Amazon SageMaker Pipelines
<a name="pipelines-quality-clarify-baseline-lifecycle"></a>

The following topic discusses how baselines and model versions evolve in the Amazon SageMaker Pipelines when using the [`ClarifyCheck`](build-and-manage-steps-types.md#step-type-clarify-check) and [`QualityCheck`](build-and-manage-steps-types.md#step-type-quality-check) steps.

For the `ClarifyCheck` step, a baseline is a single file that resides in the step properties with the suffix `constraints`. For the `QualityCheck` step, a baseline is a combination of two files that resides in the step properties: one with the suffix `statistics` and the other with the suffix `constraints`. In the following topics we discuss these properties with a prefix that describes how they are used, impacting baseline behavior and lifecycle in these two pipeline steps. For example, the `ClarifyCheck` step always calculates and assigns the new baselines in the `CalculatedBaselineConstraints` property and the `QualityCheck` step does the same in the `CalculatedBaselineConstraints` and `CalculatedBaselineStatistics` properties.

## Baseline calculation and registration for ClarifyCheck and QualityCheck steps
<a name="pipelines-quality-clarify-baseline-calculations"></a>

Both the `ClarifyCheck` and `QualityCheck` steps always calculate new baselines based on step inputs through the underlying processing job run. These newly calculated baselines are accessed through the properties with the prefix `CalculatedBaseline`. You can record these properties as the `ModelMetrics` of your model package in the [Model step](build-and-manage-steps-types.md#step-type-model). This model package can be registered with 5 different baselines. You can register it with one for each check type: data bias, model bias, and model explainability from running the `ClarifyCheck` step and model quality, and data quality from running the `QualityCheck` step. The `register_new_baseline` parameter dictates the value set in the properties with the prefix `BaselineUsedForDriftCheck` after a step runs.

The following table of potential use cases shows different behaviors resulting from the step parameters you can set for the `ClarifyCheck` and `QualityCheck` steps:


| Possible use case that you may consider for selecting this configuration  | `skip_check` / `register_new_baseline` | Does step do a drift check? | Value of step property `CalculatedBaseline` | Value of step property `BaselineUsedForDriftCheck` | 
| --- | --- | --- | --- | --- | 
| You are doing regular retraining with checks enabled to get a new model version, but you *want to carry over the previous baselines* as the `DriftCheckBaselines` in the model registry for your new model version. | False/ False | Drift check runs against existing baselines | New baselines calculated by running the step | Baseline from the latest approved model in Model Registry or the baseline supplied as step parameter | 
| You are doing regular retraining with checks enabled to get a new model version, but you *want to refresh the `DriftCheckBaselines` in the model registry with the newly calculated baselines* for your new model version. | False/ True | Drift check runs against existing baselines | New baselines calculated by running the step | Newly calculated baseline by running the step (value of property CalculatedBaseline) | 
| You are initiating the pipeline to retrain a new model version because there is a violation detected by Amazon SageMaker Model Monitor on an endpoint for a particular type of check, and you want to *skip this type of check against the previous baseline, but carry over the previous baseline as `DriftCheckBaselines` in the model registry* for your new model version. | True/ False | No drift check | New baselines calculated by running | Baseline from the latest approved model in the model registry or the baseline supplied as step parameter | 
| This happens in the following cases: [\[See the AWS documentation website for more details\]](http://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-quality-clarify-baseline-lifecycle.html)  | True/ True | No drift check | New baselines calculated by running the step | Newly calculated baseline by running the step (value of property CalculatedBaseline) | 

**Note**  
If you use scientific notation in your constraint, you need to convert to float. For a preprocessing script example of how to do this, see [Create a Model Quality Baseline](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-baseline.html).

When you register a model with [Model step](build-and-manage-steps-types.md#step-type-model), you can register the `BaselineUsedForDriftCheck` property as `DriftCheckBaselines`. These baseline files can then be used by Model Monitor for model and data quality checks. In addition, these baselines can also be used in the ClarifyCheckStep and `QualityCheck` step to compare newly trained models against the existing models that are registered in the model registry for future pipeline runs.

## Drift Detection against Previous Baselines in Pipelines
<a name="pipelines-quality-clarify-baseline-drift-detection"></a>

In the case of the `QualityCheck` step, when you initiate the pipeline for regular retraining to get a new model version, you may not want to run the training step if the data quality and the data bias has [Schema for Violations (constraint\$1violations.json file)](model-monitor-interpreting-violations.md) on the baselines of your previous approved model version. You also may not want to register the newly trained model version if the model quality, model bias, or model explainability violates the registered baseline of your previous approved model version when running the `ClarifyCheck` step. In these cases, you can enable the checks you want by setting the `skip_check` property of the corresponding check step set to `False`, resulting in the `ClarifyCheck` and `QualityCheck` step failing if violation is detected against previous baselines. The pipeline process then does not proceed so that the model drifted from the baseline isn't registered. `ClarifyCheck` and `QualityCheck` steps are able to get `DriftCheckBaselines` of the latest approved model version of a given model package group against which to compare. Previous baselines can also be supplied directly through `supplied_baseline_constraints` (in addition to `supplied_baseline_statistics` if it is a `QualityCheck` step) and are always prioritized over any baselines pulled from the model package group. 

## Baseline and model version lifecycle and evolution with Pipelines
<a name="pipelines-quality-clarify-baseline-evolution"></a>

By setting `register_new_baseline` of your `ClarifyCheck` and `QualityCheck` step to `False`, your previous baseline is accessible through the step property prefix `BaselineUsedForDriftCheck`. You can then register these baselines as the `DriftCheckBaselines` in the new model version when you register a model with [Model step](build-and-manage-steps-types.md#step-type-model). Once you approve this new model version in the model registry, the `DriftCheckBaseline` in this model version becomes available for the `ClarifyCheck` and `QualityCheck` steps in the next pipeline process. If you want to refresh the baseline of a certain check type for future model versions, you can set `register_new_baseline` to `True` so that the properties with prefix `BaselineUsedForDriftCheck` become the newly calculated baseline. In these ways, you can preserve your preferred baselines for a model trained in the future, or refresh the baselines for drift checks when needed, managing your baseline evolution and lifecycle throughout your model training iterations. 

The following diagram illustrates a model-version-centric view of the baseline evolution and lifecycle.

![\[A model-version-centric view of the baseline evolution and lifecycle.\]](http://docs.aws.amazon.com/sagemaker/latest/dg/images/pipelines/Baseline-Lifecycle.png)


# Schedule Pipeline Runs
<a name="pipeline-eventbridge"></a>

You can schedule your Amazon SageMaker Pipelines executions using [Amazon EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/what-is-amazon-eventbridge.html). Amazon SageMaker Pipelines is supported as a target in [Amazon EventBridge](https://docs.aws.amazon.com/eventbridge/latest/userguide/what-is-amazon-eventbridge.html). This allows you to initiate the execution of your model building pipeline based on any event in your event bus. With EventBridge, you can automate your pipeline executions and respond automatically to events such as training job or endpoint status changes. Events include a new file being uploaded to your Amazon S3 bucket, a change in status of your Amazon SageMaker AI endpoint due to drift, and *Amazon Simple Notification Service* (SNS) topics.

The following Pipelines actions can be automatically initiated:  
+  `StartPipelineExecution` 

For more information on scheduling SageMaker AI jobs, see [Automating SageMaker AI with Amazon EventBridge.](https://docs.aws.amazon.com/sagemaker/latest/dg/automating-sagemaker-with-eventbridge.html) 

**Topics**
+ [Schedule a Pipeline with Amazon EventBridge](#pipeline-eventbridge-schedule)
+ [Schedule a pipeline with the SageMaker Python SDK](#build-and-manage-scheduling)

## Schedule a Pipeline with Amazon EventBridge
<a name="pipeline-eventbridge-schedule"></a>

To start a pipeline execution with Amazon CloudWatch Events, you must create an EventBridge [rule](https://docs.aws.amazon.com/eventbridge/latest/APIReference/API_Rule.html). When you create a rule for events, you specify a target action to take when EventBridge receives an event that matches the rule. When an event matches the rule, EventBridge sends the event to the specified target and initiates the action defined in the rule. 

 The following tutorials show how to schedule a pipeline execution with EventBridge using the EventBridge console or the AWS CLI.  

### Prerequisites
<a name="pipeline-eventbridge-schedule-prerequisites"></a>
+ A role that EventBridge can assume with the `SageMaker::StartPipelineExecution` permission. This role can be created automatically if you create a rule from the EventBridge console; otherwise, you need to create this role yourself. For information on creating a SageMaker AI role, see [SageMaker Roles](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html).
+ An Amazon SageMaker AI Pipeline to schedule. To create an Amazon SageMaker AI Pipeline, see [Define a Pipeline](https://docs.aws.amazon.com/sagemaker/latest/dg/define-pipeline.html).

### Create an EventBridge rule using the EventBridge console
<a name="pipeline-eventbridge-schedule-console"></a>

 The following procedure shows how to create an EventBridge rule using the EventBridge console.  

1. Navigate to the [EventBridge console](https://console.aws.amazon.com/events). 

1. Select **Rules** on the left hand side. 

1.  Select `Create Rule`. 

1. Enter a name and description for your rule.

1.  Select how you want to initiate this rule. You have the following choices for your rule: 
   + **Event pattern**: Your rule is initiated when an event matching the pattern occurs. You can choose a predefined pattern that matches a certain type of event, or you can create a custom pattern. If you select a predefined pattern, you can edit the pattern to customize it. For more information on Event patterns, see [Event Patterns in CloudWatch Events](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/CloudWatchEventsandEventPatterns.html). 
   + **Schedule**: Your rule is initiated regularly on a specified schedule. You can use a fixed-rate schedule that initiates regularly for a specified number of minutes, hour, or weeks. You can also use a [cron expression](https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html#CronExpressions) to create a more fine-grained schedule, such as “the first Monday of each month at 8am.” Schedule is not supported on a custom or partner event bus. 

1. Select your desired Event bus. 

1. Select the target(s) to invoke when an event matches your event pattern or when the schedule is initiated. You can add up to 5 targets per rule. Select `SageMaker Pipeline` in the target dropdown list. 

1. Select the pipeline you want to initiate from the pipeline dropdown list. 

1. Add parameters to pass to your pipeline execution using a name and value pair. Parameter values can be static or dynamic. For more information on Amazon SageMaker AI Pipeline parameters, see [AWS::Events::Rule SagemakerPipelineParameters](https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-sagemaker-pipeline.html#aws-resource-sagemaker-pipeline-properties).
   + Static values are passed to the pipeline execution every time the pipeline is initiated. For example, if `{"Name": "Instance_type", "Value": "ml.4xlarge"}` is specified in the parameter list, then it is passed as a parameter in `StartPipelineExecutionRequest` every time EventBridge initiates the pipeline. 
   + Dynamic values are specified using a JSON path. EventBridge parses the value from an event payload, then passes it to the pipeline execution. For example: *`$.detail.param.value`* 

1. Select the role to use for this rule. You can either use an existing role or create a new one. 

1. (Optional) Add tags. 

1. Select `Create` to finalize your rule. 

 Your rule is now in effect and ready to initiate your pipeline executions. 

### Create an EventBridge rule using the [AWS CLI](https://docs.aws.amazon.com/cli/latest/reference/events/index.html)
<a name="pipeline-eventbridge-schedule-cli"></a>

 The following procedure shows how to create an EventBridge rule using the AWS CLI. 

1. Create a rule to be initiated. When creating an EventBridge rule using the AWS CLI, you have two options for how your rule is initiated, event pattern and schedule.
   +  **Event pattern**: Your rule is initiated when an event matching the pattern occurs. You can choose a predefined pattern that matches a certain type of event, or you can create a custom pattern. If you select a predefined pattern, you can edit the pattern to customize it.  You can create a rule with event pattern using the following command: 

     ```
     aws events put-rule --name <RULE_NAME> ----event-pattern <YOUR_EVENT_PATTERN> --description <RULE_DESCRIPTION> --role-arn <ROLE_TO_EXECUTE_PIPELINE> --tags <TAGS>
     ```
   +  **Schedule**: Your rule is initiated regularly on a specified schedule. You can use a fixed-rate schedule that initiates regularly for a specified number of minutes, hour, or weeks. You can also use a cron expression to create a more fine-grained schedule, such as “the first Monday of each month at 8am.” Schedule is not supported on a custom or partner event bus. You can create a rule with schedule using the following command: 

     ```
     aws events put-rule --name <RULE_NAME> --schedule-expression <YOUR_CRON_EXPRESSION> --description <RULE_DESCRIPTION> --role-arn <ROLE_TO_EXECUTE_PIPELINE> --tags <TAGS>
     ```

1. Add target(s) to invoke when an event matches your event pattern or when the schedule is initiated. You can add up to 5 targets per rule.  For each target, you must specify:  
   +  ARN: The resource ARN of your pipeline. 
   +  Role ARN: The ARN of the role EventBridge should assume to execute the pipeline. 
   +  Parameters:  Amazon SageMaker AI pipeline parameters to pass. 

1. Run the following command to pass a Amazon SageMaker AI pipeline as a target to your rule using [put-targets](https://docs.aws.amazon.com/cli/latest/reference/events/put-targets.html) : 

   ```
   aws events put-targets --rule <RULE_NAME> --event-bus-name <EVENT_BUS_NAME> --targets "[{\"Id\": <ID>, \"Arn\": <RESOURCE_ARN>, \"RoleArn\": <ROLE_ARN>, \"SageMakerPipelineParameter\": { \"SageMakerParameterList\": [{\"Name\": <NAME>, \"Value\": <VALUE>}]} }]"] 
   ```

## Schedule a pipeline with the SageMaker Python SDK
<a name="build-and-manage-scheduling"></a>

The following sections show you how to set up permissions to access EventBridge resources and create your pipeline schedule using the SageMaker Python SDK. 

### Required permissions
<a name="build-and-manage-scheduling-permissions"></a>

You need to have necessary permissions to use the pipeline scheduler. Complete the following steps to set up your permissions:

1. Attach the following minimum privilege policy to the IAM role used to create the pipeline triggers, or use the AWS managed policy `AmazonEventBridgeSchedulerFullAccess`.

------
#### [ JSON ]

****  

   ```
   {
       "Version":"2012-10-17",		 	 	 
       "Statement":
       [
           {
               "Action":
               [
                   "scheduler:ListSchedules",
                   "scheduler:GetSchedule",
                   "scheduler:CreateSchedule",
                   "scheduler:UpdateSchedule",
                   "scheduler:DeleteSchedule"
               ],
               "Effect": "Allow",
               "Resource":
               [
                   "*"
               ]
           },
           {
               "Effect": "Allow",
               "Action": "iam:PassRole",
               "Resource": "arn:aws:iam::*:role/*", 
               "Condition": {
                   "StringLike": {
                       "iam:PassedToService": "scheduler.amazonaws.com"
                   }
               }
           }
       ]
   }
   ```

------

1. Establish a trust relationship with EventBridge by adding the service principal `scheduler.amazonaws.com` to this role’s trust policy. Make sure you attach the following trust policy to the execution role if you launch the notebook in SageMaker Studio.

------
#### [ JSON ]

****  

```
{
    "Version":"2012-10-17",		 	 	 
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": [
                    "scheduler.amazonaws.com",
                    "sagemaker.amazonaws.com"
                ]
            },
        "Action": "sts:AssumeRole"
        }
    ]
}
```

------

### Create a pipeline schedule
<a name="build-and-manage-scheduling-create"></a>

Using the `PipelineSchedule` constructor, you can schedule a pipeline to run once or at a predetermined interval. A pipeline schedule must be of the type `at`, `rate`, or `cron`. This set of scheduling types is an extension of the [EventBridge scheduling options](https://docs.aws.amazon.com/scheduler/latest/UserGuide/schedule-types.html). For more information about how to use the `PipelineSchedule` class, see [sagemaker.workflow.triggers.PipelineSchedule](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#pipeline-schedule). The following example demonstrates how to create each scheduling type with `PipelineSchedule`.

```
from sagemaker.workflow.triggers import PipelineSchedule

# schedules a pipeline run for 12/13/2023 at time 10:15:20 UTC
my_datetime_schedule = PipelineSchedule(
    name="<schedule-name>", 
    at=datetime(2023, 12, 13, 10, 15, 20)
)

# schedules a pipeline run every 5 minutes
my_rate_schedule = PipelineSchedule(
    name="<schedule-name>", 
    rate=(5, "minutes")
)

# schedules a pipeline run at 10:15am UTC on the last Friday of each month during the years 2022 to 2023
my_cron_schedule = PipelineSchedule(
    name="<schedule-name>", 
    cron="15 10 ? * 6L 2022-2023"
)
```

**Note**  
If you create a one-time schedule and need to access the current time, use `datetime.utcnow()` instead of `datetime.now()`. The latter does not store the current zone context and results in an incorrect time passed to EventBridge.

### Attach the trigger to your pipeline
<a name="build-and-manage-scheduling-attach"></a>

To attach your `PipelineSchedule` to your pipeline, invoke the `put_triggers` call on your created pipeline object with a list of triggers. If you get a response ARN, you successfully created the schedule in your account and EventBridge begins to invoke the target pipeline at the time or rate specified. You must specify a role with correct permissions to attach triggers to a parent pipeline. If you don't provide one, Pipelines fetches the default role used to create the pipeline from the [configuration file](https://docs.aws.amazon.com/sagemaker/latest/dg/train-remote-decorator-config.html).

The following example demonstrates how to attach a schedule to a pipeline.

```
scheduled_pipeline = Pipeline(
    name="<pipeline-name>",
    steps=[...],
    sagemaker_session=<sagemaker-session>,
)
custom_schedule = PipelineSchedule(
    name="<schedule-name>", 
    at=datetime(year=2023, month=12, date=25, hour=10, minute=30, second=30)
)
scheduled_pipeline.put_triggers(triggers=[custom_schedule], role_arn=<role>)
```

### Describe current triggers
<a name="build-and-manage-scheduling-describe"></a>

To retrieve information about your created pipeline triggers, you can invoke the `describe_trigger()` API with the trigger name. This command returns details about the created schedule expression such as its start time, enabled state, and other useful information. The following snippet shows a sample invocation:

```
scheduled_pipeline.describe_trigger(name="<schedule-name>")
```

### Cleanup trigger resources
<a name="build-and-manage-scheduling-clean"></a>

Before you delete your pipeline, clean up existing triggers to avoid a resource leak in your account. You should delete the triggers before destroying the parent pipeline. You can delete your triggers by passing a list of trigger names to the `delete_triggers` API. The following snippet demonstrates how to delete triggers.

```
pipeline.delete_triggers(trigger_names=["<schedule-name>"])
```

**Note**  
Be aware of the following limitations when you delete your triggers:  
The option to delete the triggers by specifying trigger names is only available in the SageMaker Python SDK. Deleting the pipeline in the CLI or a `DeletePipeline` API call does not delete your triggers. As a result, the triggers become orphaned and SageMaker AI attempts to start a run for a non-existent pipeline.
Also, if you are using another notebook session or already deleted the pipeline target, clean up orphaned schedules through the scheduler [CLI](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/scheduler/delete-schedule.html) or EventBridge console.

# Amazon SageMaker Experiments Integration
<a name="pipelines-experiments"></a>

Amazon SageMaker Pipelines is closely integrated with Amazon SageMaker Experiments. By default, when Pipelines creates and executes a pipeline, the following SageMaker Experiments entities are created if they don't exist:
+ An experiment for the pipeline
+ A run group for every execution of the pipeline
+ A run that's added to the run group for each SageMaker AI job created in a pipeline execution step

You can compare metrics such as model training accuracy across multiple pipeline executions just as you can compare such metrics across multiple run groups of a SageMaker AI model training experiment.

The following sample shows the relevant parameters of the [Pipeline](https://github.com/aws/sagemaker-python-sdk/blob/v2.41.0/src/sagemaker/workflow/pipeline.py) class in the [Amazon SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable).

```
Pipeline(
    name="MyPipeline",
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      ExecutionVariables.PIPELINE_NAME,
      ExecutionVariables.PIPELINE_EXECUTION_ID
    ),
    steps=[...]
)
```

If you don't want an experiment and run group created for the pipeline, set `pipeline_experiment_config` to `None`.

**Note**  
Experiments integration was introduced in the Amazon SageMaker Python SDK v2.41.0.

The following naming rules apply based on what you specify for the `ExperimentName` and `TrialName` parameters of `pipeline_experiment_config`:
+ If you don't specify `ExperimentName`, the pipeline `name` is used for the experiment name.

  If you do specify `ExperimentName`, it's used for the experiment name. If an experiment with that name exists, the pipeline-created run groups are added to the existing experiment. If an experiment with that name doesn't exist, a new experiment is created.
+ If you don't specify `TrialName`, the pipeline execution ID is used for the run group name.

  If you do specify `TrialName`, it's used for the run group name. If a run group with that name exists, the pipeline-created runs are added to the existing run group. If a run group with that name doesn't exist, a new run group is created.

**Note**  
The experiment entities aren't deleted when the pipeline that created the entities is deleted. You can use the SageMaker Experiments API to delete the entities.

For information about how to view the SageMaker AI Experiment entities associated with a pipeline, see [Access experiment data from a pipeline](pipelines-studio-experiments.md). For more information on SageMaker Experiments, see [Amazon SageMaker Experiments in Studio Classic](experiments.md).

The following sections show examples of the previous rules and how they are represented in the pipeline definition file. For more information on pipeline definition files, see [Pipelines overview](pipelines-overview.md).

**Topics**
+ [Default Behavior](pipelines-experiments-default.md)
+ [Disable Experiments Integration](pipelines-experiments-none.md)
+ [Specify a Custom Experiment Name](pipelines-experiments-custom-experiment.md)
+ [Specify a Custom Run Group Name](pipelines-experiments-custom-trial.md)

# Default Behavior
<a name="pipelines-experiments-default"></a>

**Create a pipeline**

The default behavior when creating a SageMaker AI Pipeline is to automatically integrate it with SageMaker Experiments. If you don't specify any custom configuration, SageMaker AI creates an experiment with the same name as the pipeline, a run group for each execution of the pipeline using the pipeline execution ID as the name, and individual runs within each run group for every SageMaker AI job launched as part of the pipeline steps. You can seamlessly track and compare metrics across different pipeline executions, similar to how you would analyze a model training experiment. The following section demonstrates this default behavior when defining a pipeline without explicitly configuring the experiment integration.

The `pipeline_experiment_config` is omitted. `ExperimentName` defaults to the pipeline `name`. `TrialName` defaults to the execution ID.

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    steps=[step_train]
)
```

**Pipeline definition file**

```
{
  "Version": "2020-12-01",
  "Parameters": [
    {
      "Name": "InputDataSource"
    },
    {
      "Name": "InstanceCount",
      "Type": "Integer",
      "DefaultValue": 1
    }
  ],
  "PipelineExperimentConfig": {
    "ExperimentName": {"Get": "Execution.PipelineName"},
    "TrialName": {"Get": "Execution.PipelineExecutionId"}
  },
  "Steps": [...]
}
```

# Disable Experiments Integration
<a name="pipelines-experiments-none"></a>

**Create a pipeline**

You can disable your pipeline's integration with SageMaker Experiments by setting the `pipeline_experiment_config` parameter to `None` when you define your pipeline. This way, SageMaker AI will not automatically create an experiment, run groups, or individual runs for tracking metrics and artifacts associated with your pipeline executions. The following example sets the pipeline config parameter to `None`.

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=None,
    steps=[step_train]
)
```

**Pipeline definition file**

This is the same as the preceding default example, without the `PipelineExperimentConfig`.

# Specify a Custom Experiment Name
<a name="pipelines-experiments-custom-experiment"></a>

While the default behavior is to use the pipeline name as the experiment name in SageMaker Experiments, you can override this and specify a custom experiment name instead. This can be useful if you want to group multiple pipeline executions under the same experiment for easier analysis and comparison. The run group name will still default to the pipeline execution ID unless you explicitly set a custom name for that as well. The following section demonstrates how to create a pipeline with a custom experiment name while leaving the run group name as the default execution ID.

**Create a pipeline**

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      "CustomExperimentName",
      ExecutionVariables.PIPELINE_EXECUTION_ID
    ),
    steps=[step_train]
)
```

**Pipeline definition file**

```
{
  ...,
  "PipelineExperimentConfig": {
    "ExperimentName": "CustomExperimentName",
    "TrialName": {"Get": "Execution.PipelineExecutionId"}
  },
  "Steps": [...]
}
```

# Specify a Custom Run Group Name
<a name="pipelines-experiments-custom-trial"></a>

In addition to setting a custom experiment name, you can also specify a custom name for the run groups created by SageMaker Experiments during pipeline executions. This name is appended with the pipeline execution ID to ensure uniqueness. You can specify a custom run group name to identify and analyze related pipeline runs within the same experiment. The following section shows how to define a pipeline with a custom run group name while using the default pipeline name for the experiment name.

**Create a pipeline**

```
pipeline_name = f"MyPipeline"
pipeline = Pipeline(
    name=pipeline_name,
    parameters=[...],
    pipeline_experiment_config=PipelineExperimentConfig(
      ExecutionVariables.PIPELINE_NAME,
      Join(on="-", values=["CustomTrialName", ExecutionVariables.PIPELINE_EXECUTION_ID])
    ),
    steps=[step_train]
)
```

**Pipeline definition file**

```
{
  ...,
  "PipelineExperimentConfig": {
    "ExperimentName": {"Get": "Execution.PipelineName"},
    "TrialName": {
      "On": "-",
      "Values": [
         "CustomTrialName",
         {"Get": "Execution.PipelineExecutionId"}
       ]
    }
  },
  "Steps": [...]
}
```

# Run pipelines using local mode
<a name="pipelines-local-mode"></a>

SageMaker Pipelines local mode is an easy way to test your training, processing and inference scripts, as well as the runtime compatibility of [pipeline parameters](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_building_pipeline.html#pipeline-parameters) before you execute your pipeline on the managed SageMaker AI service. By using local mode, you can test your SageMaker AI pipeline locally using a smaller dataset. This allows quick and easy debugging of errors in user scripts and the pipeline definition itself without incurring the costs of using the managed service. The following topic shows you how to define and run pipelines locally.

Pipelines local mode leverages [SageMaker AI jobs local mode](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode) under the hood. This is a feature in the SageMaker Python SDK that allows you to run SageMaker AI built-in or custom images locally using Docker containers. Pipelines local mode is built on top of SageMaker AI jobs local mode. Therefore, you can expect to see the same results as if you were running those jobs separately. For example, local mode still uses Amazon S3 to upload model artifacts and processing outputs. If you want data generated by local jobs to reside on local disk, you can use the setup mentioned in [Local Mode](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode).

Pipeline local mode currently supports the following step types:
+ [Training step](build-and-manage-steps-types.md#step-type-training)
+ [Processing step](build-and-manage-steps-types.md#step-type-processing)
+ [Transform step](build-and-manage-steps-types.md#step-type-transform)
+ [Model Step](https://docs.aws.amazon.com/sagemaker/latest/dg/build-and-manage-steps.html#step-type-model-create) (with Create Model arguments only)
+ [Condition step](build-and-manage-steps-types.md#step-type-condition)
+ [Fail step](build-and-manage-steps-types.md#step-type-fail)

As opposed to the managed Pipelines service which allows multiple steps to execute in parallel using [Parallelism Configuration](https://sagemaker.readthedocs.io/en/stable/workflows/pipelines/sagemaker.workflow.pipelines.html#parallelism-configuration), the local pipeline executor runs the steps sequentially. Therefore, overall execution performance of a local pipeline may be poorer than one that runs on the cloud - this mostly depends on the size of the dataset, algorithm, as well as the power of your local computer. Also note that Pipelines runs in local mode are not recorded in [SageMaker Experiments](https://docs.aws.amazon.com/sagemaker/latest/dg/pipelines-experiments.html).

**Note**  
Pipelines local mode is not compatible with SageMaker AI algorithms such as XGBoost. If you to want use these algorithms, you must use them in [script mode](https://sagemaker-examples.readthedocs.io/en/latest/sagemaker-script-mode/sagemaker-script-mode.html).

In order to execute a pipeline locally, the `sagemaker_session` fields associated with the pipeline steps and the pipeline itself need to be of type `LocalPipelineSession`. The following example shows how you can define a SageMaker AI pipeline to execute locally.

```
from sagemaker.workflow.pipeline_context import LocalPipelineSession
from sagemaker.pytorch import PyTorch
from sagemaker.workflow.steps import TrainingStep
from sagemaker.workflow.pipeline import Pipeline

local_pipeline_session = LocalPipelineSession()

pytorch_estimator = PyTorch(
    sagemaker_session=local_pipeline_session,
    role=sagemaker.get_execution_role(),
    instance_type="ml.c5.xlarge",
    instance_count=1,
    framework_version="1.8.0",
    py_version="py36",
    entry_point="./entry_point.py",
)

step = TrainingStep(
    name="MyTrainingStep",
    step_args=pytorch_estimator.fit(
        inputs=TrainingInput(s3_data="s3://amzn-s3-demo-bucket/my-data/train"),
    )
)

pipeline = Pipeline(
    name="MyPipeline",
    steps=[step],
    sagemaker_session=local_pipeline_session
)

pipeline.create(
    role_arn=sagemaker.get_execution_role(), 
    description="local pipeline example"
)

// pipeline will execute locally
execution = pipeline.start()

steps = execution.list_steps()

training_job_name = steps['PipelineExecutionSteps'][0]['Metadata']['TrainingJob']['Arn']

step_outputs = pipeline_session.sagemaker_client.describe_training_job(TrainingJobName = training_job_name)
```

Once you are ready to execute the pipeline on the managed SageMaker Pipelines service, you can do so by replacing `LocalPipelineSession` in the previous code snippet with `PipelineSession` (as shown in the following code sample) and rerunning the code.

```
from sagemaker.workflow.pipeline_context import PipelineSession

pipeline_session = PipelineSession()
```

# Troubleshooting Amazon SageMaker Pipelines
<a name="pipelines-troubleshooting"></a>

When using Amazon SageMaker Pipelines, you might run into issues for various reasons. This topic provides information about common errors and how to resolve them. 

 **Pipeline Definition Issues** 

Your pipeline definition might not be formatted correctly. This can result in your execution failing or your job being inaccurate. These errors can be caught when the pipeline is created or when an execution occurs. If your definition doesn’t validate, Pipelines returns an error message identifying the character where the JSON file is malformed. To fix this problem, review the steps created using the SageMaker AI Python SDK for accuracy. 

You can only include steps in a pipeline definition once. Because of this, steps cannot exist as part of a condition step *and* a pipeline in the same pipeline. 

 **Examining Pipeline Logs** 

You can view the status of your steps using the following command: 

```
execution.list_steps()
```

Each step includes the following information:
+ The ARN of the entity launched by the pipeline, such as SageMaker AI job ARN, model ARN, or model package ARN. 
+ The failure reason includes a brief explanation of the step failure.
+ If the step is a condition step, it includes whether the condition is evaluated to true or false.  
+ If the execution reuses a previous job execution, the `CacheHit` lists the source execution.  

You can also view the error messages and logs in the Amazon SageMaker Studio interface. For information about how to see the logs in Studio, see [View the details of a pipeline run](pipelines-studio-view-execution.md).

 **Missing Permissions** 

Correct permissions are required for the role that creates the pipeline execution, and the steps that create each of the jobs in your pipeline execution. Without these permissions, you may not be able to submit your pipeline execution or run your SageMaker AI jobs as expected. To ensure that your permissions are properly set up, see [IAM Access Management](build-and-manage-access.md). 

 **Job Execution Errors ** 

You may run into issues when executing your steps because of issues in the scripts that define the functionality of your SageMaker AI jobs. Each job has a set of CloudWatch logs. To view these logs from Studio, see [View the details of a pipeline run](pipelines-studio-view-execution.md). For information about using CloudWatch logs with SageMaker AI, see [CloudWatch Logs for Amazon SageMaker AI](logging-cloudwatch.md). 

 **Property File Errors** 

You may have issues when incorrectly implementing property files with your pipeline. To ensure that your implementation of property files works as expected, see [Pass Data Between Steps](build-and-manage-propertyfile.md). 

 **Issues copying the script to the container in the Dockerfile** 

You can either copy the script to the container or pass it via the `entry_point` argument (of your estimator entity) or `code` argument (of your processor entity), as demonstrated in the following code sample.

```
step_process = ProcessingStep(
    name="PreprocessAbaloneData",
    processor=sklearn_processor,
    inputs = [
        ProcessingInput(
            input_name='dataset',
            source=...,
            destination="/opt/ml/processing/code",
        )
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination = processed_data_path),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination = processed_data_path),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination = processed_data_path),
    ],
    code=os.path.join(BASE_DIR, "process.py"), ## Code is passed through an argument
    cache_config = cache_config,
    job_arguments = ['--input', 'arg1']
)

sklearn_estimator = SKLearn(
    entry_point=os.path.join(BASE_DIR, "train.py"), ## Code is passed through the entry_point
    framework_version="0.23-1",
    instance_type=training_instance_type,
    role=role,
    output_path=model_path, # New
    sagemaker_session=sagemaker_session, # New
    instance_count=1, # New
    base_job_name=f"{base_job_prefix}/pilot-train",
    metric_definitions=[
        {'Name': 'train:accuracy', 'Regex': 'accuracy_train=(.*?);'},
        {'Name': 'validation:accuracy', 'Regex': 'accuracy_validation=(.*?);'}
    ],
)
```