本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
预处理和后处理
您可以使用自定义的预处理和后处理 Python 脚本将输入转换为 Model Monitor,或者在成功运行监控后扩展代码。将这些脚本上传到 Amazon S3,并在创建 Model Monitor 时引用它们。
以下示例说明了如何使用预处理和后处理脚本来自定义监控计划。Replace(替换) user placeholder
text
用你自己的信息。
import boto3, os from sagemaker import get_execution_role, Session from sagemaker.model_monitor import CronExpressionGenerator, DefaultModelMonitor # Upload pre and postprocessor scripts session = Session() bucket = boto3.Session().resource("s3").Bucket(session.default_bucket()) prefix = "
demo-sagemaker-model-monitor
" pre_processor_script = bucket.Object(os.path.join(prefix, "preprocessor
.py")).upload_file("preprocessor
.py") post_processor_script = bucket.Object(os.path.join(prefix, "postprocessor
.py")).upload_file("postprocessor
.py") # Get execution role role = get_execution_role() # can be an empty string # Instance type instance_type = "instance-type
" # instance_type = "ml.m5.xlarge" # Example # Create a monitoring schedule with pre and postprocessing my_default_monitor = DefaultModelMonitor( role=role, instance_count=1
, instance_type=instance_type, volume_size_in_gb=20
, max_runtime_in_seconds=3600
, ) s3_report_path = "s3://{}/{}".format(bucket, "reports
") monitor_schedule_name = "monitor-schedule-name
" endpoint_name = "endpoint-name
" my_default_monitor.create_monitoring_schedule( post_analytics_processor_script=post_processor_script, record_preprocessor_script=pre_processor_script, monitor_schedule_name=monitor_schedule_name, # use endpoint_input for real-time endpoint endpoint_input=endpoint_name, # or use batch_transform_input for batch transform jobs # batch_transform_input=batch_transform_name, output_s3_uri=s3_report_path, statistics=my_default_monitor.baseline_statistics(), constraints=my_default_monitor.suggested_constraints(), schedule_cron_expression=CronExpressionGenerator.hourly(), enable_cloudwatch_metrics=True, )
预处理脚本
当需要将输入转换为 Model Monitor 时,请使用预处理脚本。
例如,假设模型的输出是一个数组 [1.0,
2.1]
。Amazon SageMaker 模型监视器容器仅适用于表格或扁平JSON结构,例如。{“
你可以使用如下所示的预处理脚本将数组转换为正确的JSON结构。prediction0
”: 1.0,
“prediction1
” : 2.1}
def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }
在另一个示例中,假设您的模型具有可选特征,并且您使用 -1
来表示该可选特征有缺失值。如果您有数据质量监控器,则可能需要从输入值数组中删除 -1
,使其不包含在监控器的指标计算中。您可以使用如下所示的脚本来删除这些值。
def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
您的预处理脚本接收 inference_record
作为其唯一输入。下面的代码片段显示了 inference_record
示例。
{ "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "
132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1
", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287
", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9
", "inferenceTime": "2019-11-20T23:33:12Z
" }, "eventVersion": "0
" }
以下代码片段显示了 inference_record
的完整类结构。
KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_GROUND_TRUTH_DATA = "groundTruthData" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_ENDPOINT_INPUT = "endpointInput" KEY_EVENTDATA_ENDPOINT_OUTPUT = "endpointOutput" KEY_EVENTDATA_BATCH_OUTPUT = "batchTransformOutput" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENT_METADATA_CUSTOM_ATTR, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.batch_transform_output = None self.ground_truth = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT]) if KEY_EVENTDATA_ENDPOINT_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_OUTPUT]) if KEY_EVENTDATA_BATCH_OUTPUT in event_dict[KEY_EVENTDATA]: self.batch_transform_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT]) if KEY_GROUND_TRUTH_DATA in event_dict: self.ground_truth = EventData(event_dict[KEY_GROUND_TRUTH_DATA]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_ENDPOINT_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_ENDPOINT_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_ENDPOINT_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_ENDPOINT_OUTPUT ] = self.endpoint_output.as_dict() if KEY_EVENTDATA_BATCH_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_BATCH_OUTPUT] = self.batch_transform_output.as_dict() self._event_dict_postprocessed = True return self.event_dict def __str__(self): return str(self.as_dict())
自定义采样
您也可以在预处理脚本中应用自定义采样策略。为此,请将 Model Monitor 的第一方预构建容器配置为根据您指定的采样率忽略一定比例的记录。在以下示例中,处理程序通过在 10% 的处理程序调用中返回记录,否则返回空列表,从而对 10% 的记录进行采样。
import random def preprocess_handler(inference_record): # we set up a sampling rate of 0.1 if random.random() > 0.1: # return an empty list return [] input_data = inference_record.endpoint_input.data return {i : None if x == -1 else x for i, x in enumerate(input_data.split(","))}
预处理脚本的自定义日志记录
如果您的预处理脚本返回错误,请检查记录 CloudWatch 到调试的异常消息。您可以 CloudWatch 通过preprocess_handler
界面访问记录器。您可以将脚本中所需的任何信息记录到 CloudWatch。这在调试预处理脚本时非常有用。以下示例显示了如何使用preprocess_handler
界面登录到 CloudWatch
def preprocess_handler(inference_record, logger): logger.info(f"I'm a processing record: {inference_record}") logger.debug(f"I'm debugging a processing record: {inference_record}") logger.warning(f"I'm processing record with missing value: {inference_record}") logger.error(f"I'm a processing record with bad value: {inference_record}") return inference_record
后处理脚本
如果要在成功运行监控后扩展代码,请使用后处理脚本。
def postprocess_handler(): print("Hello from post-proc script!")