Verwenden Sie SageMaker Verarbeitung für verteiltes Feature-Engineering von ML-Datensätzen im Terabyte-Bereich - AWS Prescriptive Guidance

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Verwenden Sie SageMaker Verarbeitung für verteiltes Feature-Engineering von ML-Datensätzen im Terabyte-Bereich

Erstellt von Bol Boom Hower (AWS)

Umgebung: Produktion

Technologien: Machine Learning und KI; Big Data

AWS-Services: Amazon SageMaker

Übersicht

Viele Datensätze im Terabyte-Bereich oder größer bestehen häufig aus einer hierarchischen Ordnerstruktur, und die Dateien im Datensatz teilen sich manchmal Abhängigkeiten. Aus diesem Grund müssen Techniker und Datenwissenschaftler für Machine Learning (ML) sorgfältige Designentscheidungen treffen, um solche Daten auf das Modelltraining und die Inferenz vorzubereiten. Dieses Muster zeigt, wie Sie manuelle Makro-Sharding- und Microsharding-Techniken in Kombination mit Amazon SageMaker Processing und virtueller CPU (vCPU)-Parallelisierung verwenden können, um Feature-Engineering-Prozesse für komplizierte Big-Data-ML-Datensätze effizient zu skalieren. 

Dieses Muster definiert Makro-Sharding als die Aufteilung von Datenverzeichnissen auf mehrere Maschinen zur Verarbeitung und Micro-Sharding als die Aufteilung von Daten auf jedem Computer auf mehrere Verarbeitungs-Threads. Das Muster demonstriert diese Techniken, indem Amazon SageMaker mit Beispieldatensätzen für Zeitreihen aus dem PhysioNet MIMIC--Datensatz verwendet wird. Durch die Implementierung der Techniken in diesem Muster können Sie die Verarbeitungszeit und die Kosten für Feature Engineering minimieren und gleichzeitig die Ressourcennutzung und Durchsatzeffizienz maximieren. Diese Optimierungen basieren auf verteilter SageMaker Verarbeitung auf Amazon Elastic Compute Cloud (Amazon EC2)-Instances und vCPUs für ähnliche, große Datensätze, unabhängig vom Datentyp.

Voraussetzungen und Einschränkungen

Voraussetzungen

  • Zugriff auf SageMaker Notebook-Instances oder SageMaker Studio, wenn Sie dieses Muster für Ihren eigenen Datensatz implementieren möchten. Wenn Sie Amazon SageMaker zum ersten Mal verwenden, finden Sie weitere Informationen unter Erste Schritte mit Amazon SageMaker in der AWS-Dokumentation.

  • SageMaker Studio, wenn Sie dieses Muster mit den PhysioNet MIMIC- Beispieldaten implementieren möchten. 

  • Das Muster verwendet SageMaker Verarbeitung, erfordert jedoch keine Erfahrung mit der Ausführung von SageMaker Verarbeitungsaufträgen.

Einschränkungen

  • Dieses Muster eignet sich gut für ML-Datensätze, die voneinander abhängige Dateien enthalten. Diese Abhängigkeiten profitieren am meisten vom manuellen Makro-Sharding und der parallelen Ausführung mehrerer Einzel-Instance- SageMaker Verarbeitungsaufträge. Für Datensätze, in denen solche Abhängigkeiten nicht vorhanden sind, ist das ShardedByS3Key Feature in SageMaker Processing möglicherweise eine bessere Alternative zu Makro-Sharding, da es Sharded-Daten an mehrere Instances sendet, die von demselben Verarbeitungsauftrag verwaltet werden. Sie können jedoch die Microsharding-Strategie dieses Musters in beiden Szenarien implementieren, um Instance-vCPUs optimal zu nutzen.

Produktversionen

  • Amazon SageMaker Python SDK Version 2

Architektur

Zieltechnologie-Stack

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon SageMaker

Zielarchitektur

Makro-Sharding und verteilte EC2-Instances

Die 10 parallelen Prozesse, die in dieser Architektur dargestellt werden, spiegeln die Struktur des MIMIC- Datensatzes wider. (Prozesse werden zur Vereinfachung des Diagramms durch Ellipsen dargestellt.) Eine ähnliche Architektur gilt für jeden Datensatz, wenn Sie manuelles Makro-Sharding verwenden. Im Fall von MIMIC- können Sie die Rohstruktur des Datensatzes zu Ihrem Vorteil verwenden, indem Sie jeden Ordner der Patientengruppe mit minimalem Aufwand separat verarbeiten. Im folgenden Diagramm wird der Datensatzgruppenblock auf der linken Seite angezeigt (1). Angesichts der verteilten Natur der Daten ist es sinnvoll, nach Patientengruppe zu fragmentieren.

Architektur für Microsharding und verteilte EC2-Instances

Das manuelle Sharding nach Patientengruppe bedeutet jedoch, dass für jeden Ordner der Patientengruppe ein separater Verarbeitungsauftrag erforderlich ist, wie Sie im mittleren Abschnitt des Diagramms (2) sehen können, anstatt einen einzelnen Verarbeitungsauftrag mit mehreren EC2-Instances. Da die Daten von MIMIC- sowohl binäre microSD-Dateien als auch übereinstimmende textbasierte Header-Dateien enthalten und eine erforderliche Abhängigkeit von der wfdb-Bibliothek für die binäre Datenextraktion besteht, müssen alle Datensätze für einen bestimmten Patienten auf derselben Instance verfügbar gemacht werden. Die einzige Möglichkeit, sicherzustellen, dass auch die zugehörige Header-Datei jeder binären microSD-Datei vorhanden ist, besteht darin, manuelles Sharding zu implementieren, um jeden Shard innerhalb seines eigenen Verarbeitungsauftrags auszuführen und anzugeben, s3_data_distribution_type='FullyReplicated' wann Sie die Eingabe für den Verarbeitungsauftrag definieren. Wenn alternativ alle Daten in einem einzigen Verzeichnis verfügbar waren und zwischen Dateien keine Abhängigkeiten vorlagen, kann es sinnvoller sein, einen einzelnen Verarbeitungsauftrag mit mehreren EC2-Instances zu starten und s3_data_distribution_type='ShardedByS3Key' anzugeben. Die Angabe von ShardedByS3Key als Amazon S3-Datenverteilungstyp weist SageMaker an, Daten-Sharding automatisch über Instances hinweg zu verwalten. 

Das Starten eines Verarbeitungsauftrags für jeden Ordner ist eine kosteneffiziente Methode zur Vorverarbeitung der Daten, da das gleichzeitige Ausführen mehrerer Instances Zeit spart. Für zusätzliche Kosten und Zeiteinsparungen können Sie Microsharding in jedem Verarbeitungsauftrag verwenden. 

Microsharding und parallele vCPUs

Innerhalb jedes Verarbeitungsauftrags werden die gruppierten Daten weiter unterteilt, um die Nutzung aller verfügbaren vCPUs auf der SageMaker vollständig verwalteten EC2-Instance zu maximieren. Die Blöcke im mittleren Abschnitt des Diagramms (2) zeigen, was innerhalb jedes primären Verarbeitungsauftrags passiert. Der Inhalt der Ordner für Patientendaten wird je nach Anzahl der verfügbaren vCPUs auf der Instance abgeflacht und gleichmäßig aufgeteilt. Nachdem der Inhalt des Ordners aufgeteilt wurde, wird der gleich große Satz von Dateien zur Verarbeitung auf alle vCPUs verteilt. Wenn die Verarbeitung abgeschlossen ist, werden die Ergebnisse jeder vCPU für jeden Verarbeitungsauftrag zu einer einzigen Datendatei zusammengefasst. 

Im angehängten Code werden diese Konzepte im folgenden Abschnitt der -src/feature-engineering-pass1/preprocessing.pyDatei dargestellt.

def chunks(lst, n):     """     Yield successive n-sized chunks from lst.          :param lst: list of elements to be divided     :param n: number of elements per chunk     :type lst: list     :type n: int     :return: generator comprising evenly sized chunks     :rtype: class 'generator'     """     for i in range(0, len(lst), n):         yield lst[i:i + n]     # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']   # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))   # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)   # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))

Eine Funktion, chunks, ist zunächst so definiert, dass sie eine bestimmte Liste verbraucht, indem sie sie in gleichmäßig dimensionierte Blöcke der Länge unterteilt und diese Ergebnisse als Generator zurückgibt. Als Nächstes werden die Daten in allen Patientenordnern abgeflacht, indem eine Liste aller vorhandenen binärenTelefoniedateien kompiliert wird. Danach wird die Anzahl der auf der EC2-Instance verfügbaren vCPUs abgerufen. Die Liste der binären microSD-Dateien wird gleichmäßig auf diese vCPUs aufgeteiltchunks, indem aufgerufen wird. Anschließend wird jede microSD-Unterliste mithilfe der parallelen Klasse von joblib auf ihrer eigenen vCPU verarbeitet. Die Ergebnisse werden vom Verarbeitungsauftrag automatisch zu einer einzigen Liste von Datenrahmen kombiniert, die SageMaker dann weiter verarbeitet, bevor sie nach Abschluss des Auftrags in Amazon S3 geschrieben werden. In diesem Beispiel werden von den Verarbeitungsaufträgen 10 Dateien in Amazon S3 geschrieben (eine für jeden Auftrag).

Wenn alle anfänglichen Verarbeitungsaufträge abgeschlossen sind, kombiniert ein sekundärer Verarbeitungsauftrag, der im -Block rechts neben dem Diagramm angezeigt wird (3), die von jedem primären Verarbeitungsauftrag erzeugten Ausgabedateien und schreibt die kombinierte Ausgabe in Amazon S3 (4).

Tools

Tools

  • Python – Der für dieses Muster verwendete Beispielcode ist Python (Version 3).

  • SageMaker Studio – Amazon SageMaker Studio ist eine webbasierte, integrierte Entwicklungsumgebung (IDE) für Machine Learning, mit der Sie Ihre Machine-Learning-Modelle erstellen, trainieren, debuggen, bereitstellen und überwachen können. Sie führen SageMaker Verarbeitungsaufträge mithilfe von Jupyter-Notebooks in SageMaker Studio aus.

  • SageMaker Verarbeitung – Amazon SageMaker Processing bietet eine vereinfachte Möglichkeit, Ihre Datenverarbeitungs-Workloads auszuführen. In diesem Muster wird der Feature-Engineering-Code mithilfe von SageMaker Verarbeitungsaufträgen in großem Umfang implementiert.

Code

Die angehängte ZIP-Datei stellt den vollständigen Code für dieses Muster bereit. Im folgenden Abschnitt werden die Schritte zum Erstellen der Architektur für dieses Muster beschrieben. Jeder Schritt wird durch Beispielcode aus dem Anhang veranschaulicht.

Polen

AufgabeBeschreibungErforderliche Fähigkeiten
Greifen Sie auf Amazon SageMaker Studio zu.

Integrieren Sie SageMaker Studio in Ihrem AWS-Konto, indem Sie den Anweisungen in der Amazon- SageMaker Dokumentation folgen.

Datenwissenschaftler, ML-Techniker
Installieren Sie das Dienstprogramm wget.

Installieren Sie wget, wenn Sie sich mit einer neuen SageMaker Studio-Konfiguration integriert haben oder diese Dienstprogramme noch nie in SageMaker Studio verwendet haben. 

Öffnen Sie zum Installieren von ein Terminalfenster in der SageMaker Studio-Konsole und führen Sie den folgenden Befehl aus:

sudo yum install wget
Datenwissenschaftler, ML-Techniker
Laden Sie den Beispielcode herunter und entpacken Sie ihn.

Laden Sie die attachments.zip Datei im Abschnitt Anhänge herunter. Navigieren Sie in einem Terminalfenster zu dem Ordner, in den Sie die Datei heruntergeladen haben, und extrahieren Sie deren Inhalt:

unzip attachment.zip

Navigieren Sie zu dem Ordner, in den Sie die ZIP-Datei extrahiert haben, und extrahieren Sie den Inhalt der Scaled-Processing.zip Datei.

unzip Scaled-Processing.zip
Datenwissenschaftler, ML-Techniker
Laden Sie den Beispieldatensatz von physionet.org herunter und laden Sie ihn in Amazon S3 hoch.

Führen Sie das get_data.ipynb Jupyter-Notebook in dem Ordner aus, der die Scaled-Processing Dateien enthält. Dieses Notebook lädt einen MIMIC--Beispieldatensatz von physionet.org herunter und lädt ihn in Ihren SageMaker Studio-Sitzungs-Bucket in Amazon S3 hoch.

Datenwissenschaftler, ML-Techniker
AufgabeBeschreibungErforderliche Fähigkeiten
Verflacht die Dateihierarchie in allen Unterverzeichnissen.

In großen Datensätzen wie MIMIC- werden Dateien häufig über mehrere Unterverzeichnisse verteilt, auch innerhalb einer logischen übergeordneten Gruppe. Ihr Skript sollte so konfiguriert sein, dass alle Gruppendateien in allen Unterverzeichnissen reduziert werden, wie der folgende Code zeigt.

# Generate list of .dat files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']

Beachten Sie, dass die Beispielcodeausschnitte in diesem Epics aus der src/feature-engineering-pass1/preprocessing.py Datei stammen, die in der Anfügung bereitgestellt wird.

Datenwissenschaftler, ML-Techniker
Teilen Sie Dateien basierend auf der vCPU-Anzahl in Untergruppen auf.

Dateien sollten in gleich große Untergruppen oder Blöcke unterteilt werden, je nach Anzahl der vCPUs, die auf der Instance vorhanden sind, auf der das Skript ausgeführt wird. In diesem Schritt können Sie Code ähnlich dem folgenden implementieren.

# Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))
Datenwissenschaftler, ML-Techniker
Parallelisieren Sie die Verarbeitung von Untergruppen über vCPUs hinweg.

Die Skriptlogik sollte so konfiguriert werden, dass alle Untergruppen parallel verarbeitet werden. Verwenden Sie dazu die Parallel Klasse und delayed Methode der Joblib-Bibliothek wie folgt. 

# Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
Datenwissenschaftler, ML-Techniker
Speichern Sie die Ausgabe einer einzelnen Dateigruppe in Amazon S3.

Wenn die parallele vCPU-Verarbeitung abgeschlossen ist, sollten die Ergebnisse jeder vCPU kombiniert und in den S3-Bucket-Pfad der Dateigruppe hochgeladen werden. Für diesen Schritt können Sie Code ähnlich dem folgenden verwenden.

# Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
Datenwissenschaftler, ML-Techniker
AufgabeBeschreibungErforderliche Fähigkeiten
Kombinieren Sie Datendateien, die über alle Verarbeitungsaufträge hinweg erzeugt wurden, auf denen das erste Skript ausgeführt wurde.

Das vorherige Skript gibt für jeden SageMaker Verarbeitungsauftrag, der eine Gruppe von Dateien aus dem Datensatz verarbeitet, eine einzelne Datei aus.  Als Nächstes müssen Sie diese Ausgabedateien zu einem einzigen Objekt kombinieren und einen einzelnen Ausgabedatensatz in Amazon S3 schreiben. Dies wird in der -src/feature-engineering-pass1p5/preprocessing.pyDatei, die im Anhang bereitgestellt wird, wie folgt demonstriert.

def write_parquet(wavs_df, path):     """     Write waveform summary dataframe to S3 in parquet format.          :param wavs_df: waveform summary dataframe     :param path: S3 directory prefix     :type wavs_df: pandas dataframe     :type path: str     :return: None     """     extra_args = {"ServerSideEncryption": "aws:kms"}     wr.s3.to_parquet(         df=wavs_df,         path=path,         compression='snappy',         s3_additional_kwargs=extra_args)     def combine_data():     """     Get combined data and write to parquet.          :return: waveform summary dataframe     :rtype: pandas dataframe     """     wavs_df = get_data()     wavs_df = normalize_signal_names(wavs_df)     write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))       return wavs_df     wavs_df = combine_data()
Datenwissenschaftler, ML-Techniker
AufgabeBeschreibungErforderliche Fähigkeiten
Führen Sie den ersten Verarbeitungsauftrag aus.

Um Makrosharding durchzuführen, führen Sie für jede Dateigruppe einen separaten Verarbeitungsauftrag aus. Microsharding wird innerhalb jedes Verarbeitungsauftrags durchgeführt, da jeder Auftrag Ihr erstes Skript ausführt. Der folgende Code zeigt, wie ein Verarbeitungsauftrag für jedes Dateigruppenverzeichnis im folgenden Ausschnitt gestartet wird (in enthaltennotebooks/FeatExtract_Pass1.ipynb).

pat_groups = list(range(30,40)) ts = str(int(time.time()))   for group in pat_groups:     sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                      role=role,                                      instance_type='ml.m5.4xlarge',                                      instance_count=1,                                      volume_size_in_gb=5)     sklearn_processor.run(         code='../src/feature-engineering-pass1/preprocessing.py',         job_name='-'.join(['scaled-processing-p1', str(group), ts]),         arguments=[             "input_path", "/opt/ml/processing/input",             "output_path", "/opt/ml/processing/output",             "group_data_out", "ws_df_group.json"         ],         inputs=         [             ProcessingInput(                 source=f's3://{sess.default_bucket()}/data_inputs/{group}',                 destination='/opt/ml/processing/input',                 s3_data_distribution_type='FullyReplicated'             )         ],         outputs=         [             ProcessingOutput(                 source='/opt/ml/processing/output',                 destination=f's3://{sess.default_bucket()}/data_outputs/{group}'             )         ],         wait=False     )
Datenwissenschaftler, ML-Techniker
Führen Sie den zweiten Verarbeitungsauftrag aus.

Um die vom ersten Satz von Verarbeitungsaufträgen generierten Ausgaben zu kombinieren und zusätzliche Berechnungen für die Vorverarbeitung durchzuführen, führen Sie Ihr zweites Skript mithilfe eines einzelnen SageMaker Verarbeitungsauftrags aus. Der folgende Code zeigt dies (in enthaltennotebooks/FeatExtract_Pass1p5.ipynb).

ts = str(int(time.time())) bucket = sess.default_bucket()       sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                  role=role,                                  instance_type='ml.t3.2xlarge',                                  instance_count=1,                                  volume_size_in_gb=5) sklearn_processor.run(     code='../src/feature-engineering-pass1p5/preprocessing.py',     job_name='-'.join(['scaled-processing', 'p1p5', ts]),     arguments=['bucket', bucket,                'pass1out_prefix', 'data_outputs',                'pass1out_data', 'ws_df_group.json',                'pass1p5out_data', 'waveform_summary.parquet',                'statsdata_name', 'signal_stats.csv'],     wait=True )
Datenwissenschaftler, ML-Techniker

Zugehörige Ressourcen

Anlagen

Um auf zusätzliche Inhalte zuzugreifen, die diesem Dokument zugeordnet sind, entpacken Sie die folgende Datei: attachment.zip