Schedule monitoring jobs
Amazon SageMaker Model Monitor provides you the ability to monitor the data collected from your real-time
endpoints. You can monitor your data on a recurring schedule, or you can monitor it one
time, immediately. You can create a monitoring schedule with the CreateMonitoringSchedule
API.
With a monitoring schedule, SageMaker can start processing jobs to analyze the data collected during a given period. In the processing job, SageMaker compares the dataset for the current analysis with the baseline statistics and constraints that you provide. Then, SageMaker generate a violations report. In addition, CloudWatch metrics are emitted for each feature under analysis.
SageMaker provides a prebuilt container for performing analysis on tabular datasets. Alternatively, you could choose to bring your own container as outlined in the Support for Your Own Containers With Amazon SageMaker Model Monitor topic.
You can create a model monitoring schedule for your real-time endpoint or batch transform job. Use the baseline resources (constraints and statistics) to compare against the real-time traffic or batch job inputs.
Example baseline assignments
In the following example, the training dataset used to train the model was uploaded to Amazon S3. If you already have it in Amazon S3, you can point to it directly.
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it) baseline_prefix = prefix + '/baselining' baseline_data_prefix = baseline_prefix + '/data' baseline_results_prefix = baseline_prefix + '/results' baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix) baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix) print('Baseline data uri: {}'.format(baseline_data_uri)) print('Baseline results uri: {}'.format(baseline_results_uri))
training_data_file = open("test_data/training-dataset-with-header.csv", 'rb') s3_key = os.path.join(baseline_prefix, 'data', 'training-dataset-with-header.csv') boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)
Example schedule for recurring analysis
If you are scheduling a model monitor for a real-time endpoint, use the baseline constraints and statistics to compare against real-time traffic. The following code snippet shows the general format you use to schedule a model monitor for a real-time endpoint. This example schedules the model monitor to run hourly.
from sagemaker.model_monitor import CronExpressionGenerator from time import gmtime, strftime mon_schedule_name = 'my-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) my_default_monitor.create_monitoring_schedule( monitor_schedule_name=mon_schedule_name, endpoint_input=EndpointInput( endpoint_name=endpoint_name, destination="/opt/ml/processing/input/endpoint" ), post_analytics_processor_script=s3_code_postprocessor_uri, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )
Example schedule for one-time analysis
You can also schedule the analysis to run once without recurring by passing arguments
like the following to the create_monitoring_schedule
method:
schedule_cron_expression=CronExpressionGenerator.now(), data_analysis_start_time="-PT1H", data_analysis_end_time="-PT0H",
In these arguments, the schedule_cron_expression
parameter schedules the
analysis to run once, immediately, with the value
CronExpressionGenerator.now()
. For any schedule with this setting, the
data_analysis_start_time
and data_analysis_end_time
parameters are required. These parameters set the start time and end time of an analysis
window. Define these times as offsets that are relative to the current time, and use ISO
8601 duration format. In this example, the times -PT1H
and
-PT0H
define a window between one hour in the past and the current
time. With this schedule, the analysis evaluates only the data that was collected during
the specified window.
Example schedule for a batch transform job
The following code snippet shows the general format you use to schedule a model monitor for a batch transform job.
from sagemaker.model_monitor import ( CronExpressionGenerator, BatchTransformInput, MonitoringDatasetFormat, ) from time import gmtime, strftime mon_schedule_name = 'my-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) my_default_monitor.create_monitoring_schedule( monitor_schedule_name=mon_schedule_name, batch_transform_input=BatchTransformInput( destination="opt/ml/processing/input", data_captured_destination_s3_uri=s3_capture_upload_path, dataset_format=MonitoringDatasetFormat.csv(header=False), ), post_analytics_processor_script=s3_code_postprocessor_uri, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )
desc_schedule_result = my_default_monitor.describe_schedule() print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))