Schedule monitoring jobs
Amazon SageMaker Model Monitor provides you the ability to monitor the data collected from your real-time
endpoints on a continuous schedule or from your batch transform jobs on a specific
schedule. You can create a monitoring schedule with the CreateMonitoringSchedule
API with a predefined periodic
interval. For example, every x hours (x can range from 1 to 23).
With a monitoring schedule, SageMaker can kick off processing jobs at a specified frequency to analyze the data collected during a given period. SageMaker provides a prebuilt container for performing analysis on tabular datasets. In the processing job, SageMaker compares the dataset for the current analysis with the baseline statistics, constraints provided and generate a violations report. In addition, CloudWatch metrics are emitted for each feature under analysis. Alternatively, you could choose to bring your own container as outlined in the Bring Your Own Containers topic.
You can create a model monitoring schedule for your real-time endpoint or batch transform job. Use the baseline resources (constraints and statistics) to compare against the real-time traffic or batch job inputs. In the following example, the training dataset used to train the model was uploaded to Amazon S3. If you already have it in Amazon S3, you can point to it directly.
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it) baseline_prefix = prefix + '/baselining' baseline_data_prefix = baseline_prefix + '/data' baseline_results_prefix = baseline_prefix + '/results' baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix) baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix) print('Baseline data uri: {}'.format(baseline_data_uri)) print('Baseline results uri: {}'.format(baseline_results_uri))
training_data_file = open("test_data/training-dataset-with-header.csv", 'rb') s3_key = os.path.join(baseline_prefix, 'data', 'training-dataset-with-header.csv') boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)
If you are scheduling a model monitor for a real-time endpoint, use the baseline constraints and statistics to compare against real-time traffic. The following code snippet shows the general format you use to schedule a model monitor for a real-time endpoint.
from sagemaker.model_monitor import CronExpressionGenerator from time import gmtime, strftime mon_schedule_name = 'DEMO-xgb-churn-pred-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) my_default_monitor.create_monitoring_schedule( monitor_schedule_name=mon_schedule_name, endpoint_input=EndpointInput( endpoint_name=endpoint_name, destination="/opt/ml/processing/input/endpoint" ), post_analytics_processor_script=s3_code_postprocessor_uri, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )
The following code snippet shows the general format you use to schedule a model monitor for a batch transform job.
from sagemaker.model_monitor import ( CronExpressionGenerator, BatchTransformInput, MonitoringDatasetFormat, ) from time import gmtime, strftime mon_schedule_name = 'DEMO-xgb-churn-pred-model-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime()) my_default_monitor.create_monitoring_schedule( monitor_schedule_name=mon_schedule_name, batch_transform_input=BatchTransformInput( destination="opt/ml/processing/input", data_captured_destination_s3_uri=s3_capture_upload_path, dataset_format=MonitoringDatasetFormat.csv(header=False), ) post_analytics_processor_script=s3_code_postprocessor_uri, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )
Describe and inspect the schedule: After you describe it, observe that the
MonitoringScheduleStatus
in MonitoringScheduleSummary
returned by the ListMonitoringSchedules
API changes to
Scheduled
.
desc_schedule_result = my_default_monitor.describe_schedule() print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))