Prétraitement et post-traitement - Amazon SageMaker

Prétraitement et post-traitement

En plus d'utiliser les mécanismes préconçus, vous pouvez étendre le code avec les scripts de prétraitement et de post-traitement.

Script de post-traitement

Vous pouvez étendre le code à l'aide du script de post-traitement en suivant ce contrat :

def postprocess_handler(): print("Hello from post-proc script!")

Spécifiez-le comme un chemin d'accès dans Amazon Simple Storage Service (Amazon S3), dans la requête CreateMonitoringSchedule, comme illustré ci-dessous :

.MonitoringAppSpecification.PostAnalyticsProcessorSourceUri.

Script de prétraitement

Le conteneur Amazon SageMaker Model Monitor fonctionne uniquement avec les structures JSON tabulaires ou plates. Nous fournissons un préprocesseur par enregistrement pour certaines modifications mineures nécessaires à la transformation de l'ensemble de données. Par exemple, si votre sortie est un tableau [1.0, 2.1], vous devez le convertir en une structure JSON aplatie, comme {“prediction0” : 1.0, “prediction1” : 2.1"}. Voici à quoi peut ressembler un exemple de mise en œuvre :

def preprocess_handler(inference_record): input_data = inference_record.endpoint_input.data output_data = inference_record.endpoint_output.data.rstrip("\n") data = output_data + "," + input_data return { str(i).zfill(20) : d for i, d in enumerate(data.split(",")) }

Spécifiez-le en tant que chemin d'accès dans Amazon S3, dans la requête CreateMonitoringSchedule :

.MonitoringAppSpecification.RecordPreprocessorSourceUri.

La structure inference_record est définie comme suit :

KEY_EVENT_METADATA = "eventMetadata" KEY_EVENT_METADATA_EVENT_ID = "eventId" KEY_EVENT_METADATA_EVENT_TIME = "inferenceTime" KEY_EVENT_METADATA_CUSTOM_ATTR = "customAttributes" KEY_EVENTDATA = "captureData" KEY_EVENTDATA_INPUT = "endpointInput" KEY_EVENTDATA_OUTPUT = "endpointOutput" KEY_EVENTDATA_ENCODING = "encoding" KEY_EVENTDATA_DATA = "data" KEY_EVENTDATA_OBSERVED_CONTENT_TYPE = "observedContentType" KEY_EVENTDATA_MODE = "mode" KEY_EVENT_VERSION = "eventVersion" """ { "captureData": { "endpointInput": { "observedContentType": "text/csv", "mode": "INPUT", "data": "132,25,113.2,96,269.9,107,,0,0,0,0,0,0,1,0,1,0,0,1", "encoding": "CSV" }, "endpointOutput": { "observedContentType": "text/csv; charset=utf-8", "mode": "OUTPUT", "data": "0.01076381653547287", "encoding": "CSV" } }, "eventMetadata": { "eventId": "feca1ab1-8025-47e3-8f6a-99e3fdd7b8d9", "inferenceTime": "2019-11-20T23:33:12Z" }, "eventVersion": "0" } """ class EventConfig: def __init__(self, endpoint, variant, start_time, end_time): self.endpoint = endpoint self.variant = variant self.start_time = start_time self.end_time = end_time class EventMetadata: def __init__(self, event_metadata_dict): self.event_id = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_ID, None) self.event_time = event_metadata_dict.get(KEY_EVENT_METADATA_EVENT_TIME, None) self.custom_attribute = event_metadata_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) class EventData: def __init__(self, data_dict): self.encoding = data_dict.get(KEY_EVENTDATA_ENCODING, None) self.data = data_dict.get(KEY_EVENTDATA_DATA, None) self.observedContentType = data_dict.get(KEY_EVENTDATA_OBSERVED_CONTENT_TYPE, None) self.mode = data_dict.get(KEY_EVENTDATA_MODE, None) def as_dict(self): ret = { KEY_EVENTDATA_ENCODING: self.encoding, KEY_EVENTDATA_DATA: self.data, KEY_EVENTDATA_OBSERVED_CONTENT_TYPE: self.observedContentType, } return ret class CapturedData: def __init__(self, event_dict): self.event_metadata = None self.endpoint_input = None self.endpoint_output = None self.event_version = None self.event_dict = event_dict self._event_dict_postprocessed = False if KEY_EVENT_METADATA in event_dict: self.event_metadata = EventMetadata(event_dict[KEY_EVENT_METADATA]) if KEY_EVENTDATA in event_dict: if KEY_EVENTDATA_INPUT in event_dict[KEY_EVENTDATA]: self.endpoint_input = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_INPUT]) if KEY_EVENTDATA_OUTPUT in event_dict[KEY_EVENTDATA]: self.endpoint_output = EventData(event_dict[KEY_EVENTDATA][KEY_EVENTDATA_OUTPUT]) if KEY_EVENT_VERSION in event_dict: self.event_version = event_dict[KEY_EVENT_VERSION] def as_dict(self): if self._event_dict_postprocessed is True: return self.event_dict if KEY_EVENTDATA in self.event_dict: if KEY_EVENTDATA_INPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][KEY_EVENTDATA_INPUT] = self.endpoint_input.as_dict() if KEY_EVENTDATA_OUTPUT in self.event_dict[KEY_EVENTDATA]: self.event_dict[KEY_EVENTDATA][ KEY_EVENTDATA_OUTPUT ] = self.endpoint_output.as_dict() self._event_dict_postprocessed = True return self.event_dict