Utiliser SageMaker le traitement pour l'ingénierie des fonctionnalités distribuées d'ensembles de données ML à l'échelle du téraoctet - Recommandations AWS

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utiliser SageMaker le traitement pour l'ingénierie des fonctionnalités distribuées d'ensembles de données ML à l'échelle du téraoctet

Créée par Chris Boomhower () AWS

Environnement : Production

Technologies : apprentissage automatique et intelligence artificielle ; mégadonnées

AWSservices : Amazon SageMaker

Récapitulatif

De nombreux ensembles de données de plusieurs téraoctets ou plus se composent souvent d'une structure de dossiers hiérarchique, et les fichiers du jeu de données partagent parfois des interdépendances. C'est pourquoi les ingénieurs en apprentissage automatique (ML) et les scientifiques des données doivent prendre des décisions de conception réfléchies afin de préparer ces données pour l'entraînement et l'inférence des modèles. Ce modèle montre comment vous pouvez utiliser des techniques manuelles de macrosharding et de microsharding en combinaison avec Amazon SageMaker Processing et la parallélisation virtuelle CPU (vCPU) pour adapter efficacement les processus d'ingénierie des fonctionnalités aux ensembles de données Big Data ML complexes. 

Ce modèle définit le macrosharding comme la division de répertoires de données sur plusieurs machines pour le traitement, et le microsharding comme le partage des données de chaque machine sur plusieurs threads de traitement. Le modèle illustre ces techniques en utilisant Amazon SageMaker avec des exemples d'enregistrements de formes d'onde de séries chronologiques provenant du jeu de données PhysioNet MIMIC- III. En mettant en œuvre les techniques de ce modèle, vous pouvez minimiser le temps de traitement et les coûts liés à l'ingénierie des fonctionnalités tout en maximisant l'utilisation des ressources et l'efficacité du débit. Ces optimisations reposent sur le SageMaker traitement distribué sur des instances Amazon Elastic Compute Cloud EC2 (Amazon) et vCPUs pour des ensembles de données volumineux similaires, quel que soit le type de données.

Conditions préalables et limitations

Prérequis

  • Accès aux instances de SageMaker bloc-notes ou à SageMaker Studio, si vous souhaitez implémenter ce modèle pour votre propre ensemble de données. Si vous utilisez Amazon SageMaker pour la première fois, consultez la section Commencer avec Amazon SageMaker dans la AWS documentation.

  • SageMaker Studio, si vous souhaitez implémenter ce modèle avec les données PhysioNet MIMIC- III sample. 

  • Le modèle utilise le SageMaker traitement, mais ne nécessite aucune expérience dans l'exécution de tâches SageMaker de traitement.

Limites

  • Ce modèle convient parfaitement aux ensembles de données ML qui incluent des fichiers interdépendants. Ces interdépendances tirent le meilleur parti du macrosharding manuel et de l'exécution en parallèle de plusieurs SageMaker tâches de traitement en instance unique. Pour les ensembles de données où de telles interdépendances n'existent pas, la ShardedByS3Key fonctionnalité de SageMaker Processing peut constituer une meilleure alternative au macrosharding, car elle envoie des données fragmentées à plusieurs instances gérées par la même tâche de traitement. Cependant, vous pouvez implémenter la stratégie de microsharding de ce modèle dans les deux scénarios afin d'utiliser au mieux l'instance. vCPUs

Versions du produit

  • Amazon SageMaker Python SDK version 2

Architecture

Pile technologique cible

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon SageMaker

Architecture cible

Macrosharding et instances distribuées EC2

Les 10 processus parallèles représentés dans cette architecture reflètent la structure du MIMIC jeu de III données. (Les processus sont représentés par des ellipses pour simplifier les diagrammes.) Une architecture similaire s'applique à n'importe quel ensemble de données lorsque vous utilisez le macrosharding manuel. Dans le cas de MIMIC -III, vous pouvez utiliser la structure brute de l'ensemble de données à votre avantage en traitant chaque dossier de groupe de patients séparément, avec un minimum d'effort. Dans le schéma suivant, le bloc des groupes d'enregistrements apparaît sur la gauche (1). Étant donné la nature distribuée des données, il est logique de les partager par groupe de patients.

Architecture pour le microsharding et les instances distribuées EC2

Cependant, le découpage manuel par groupe de patients signifie qu'une tâche de traitement distincte est requise pour chaque dossier de groupe de patients, comme vous pouvez le voir dans la partie centrale du diagramme (2), au lieu d'une tâche de traitement unique avec plusieurs EC2 instances. Étant donné que les données III de MIMIC - incluent à la fois des fichiers de formes d'onde binaires et des fichiers d'en-tête textuels correspondants, et que l'extraction de données binaires nécessite une dépendance à la bibliothèque wfdb, tous les dossiers d'un patient spécifique doivent être disponibles sur la même instance. La seule façon de s'assurer que le fichier d'en-tête associé à chaque fichier de forme d'onde binaire est également présent est d'implémenter le sharding manuel pour exécuter chaque partition dans le cadre de sa propre tâche de traitement, et de spécifier s3_data_distribution_type='FullyReplicated' quand vous définissez l'entrée de la tâche de traitement. Sinon, si toutes les données étaient disponibles dans un seul répertoire et qu'aucune dépendance n'existait entre les fichiers, une option plus appropriée pourrait être de lancer une seule tâche de traitement avec plusieurs EC2 instances s3_data_distribution_type='ShardedByS3Key' spécifiées. Spécifier ShardedByS3Key  comme le type de distribution de données Amazon S3 indique SageMaker de gérer automatiquement le partitionnement des données entre les instances. 

Le lancement d'une tâche de traitement pour chaque dossier est un moyen rentable de prétraiter les données, car l'exécution simultanée de plusieurs instances permet de gagner du temps. Pour économiser du temps et des coûts supplémentaires, vous pouvez utiliser le microsharding dans chaque tâche de traitement. 

Microsharding et parallélisme vCPUs

Au sein de chaque tâche de traitement, les données groupées sont ensuite divisées afin de maximiser l'utilisation de toutes les données disponibles vCPUs sur l'EC2instance SageMaker entièrement gérée. Les blocs situés dans la partie centrale du diagramme (2) décrivent ce qui se passe dans le cadre de chaque tâche de traitement principale. Le contenu des dossiers des patients est aplati et divisé de manière égale en fonction du nombre de dossiers disponibles vCPUs sur l'instance. Une fois le contenu du dossier divisé, l'ensemble de fichiers de taille uniforme est réparti entre tous vCPUs pour être traité. Lorsque le traitement est terminé, les résultats de chaque v CPU sont combinés dans un seul fichier de données pour chaque tâche de traitement. 

Dans le code joint, ces concepts sont représentés dans la section suivante du src/feature-engineering-pass1/preprocessing.py fichier.

def chunks(lst, n):     """     Yield successive n-sized chunks from lst.          :param lst: list of elements to be divided     :param n: number of elements per chunk     :type lst: list     :type n: int     :return: generator comprising evenly sized chunks     :rtype: class 'generator'     """     for i in range(0, len(lst), n):         yield lst[i:i + n]     # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']   # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))   # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)   # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))

Une fonction est d'abord définie pour consommer une liste donnée en la divisant en morceaux de taille égale et en renvoyant ces résultats sous forme de générateur. chunks Ensuite, les données sont aplaties dans les dossiers des patients en compilant une liste de tous les fichiers de formes d'onde binaires présents. Une fois cela fait, le nombre de fichiers vCPUs disponibles sur l'EC2instance est obtenu. La liste des fichiers de formes d'onde binaires est répartie uniformément entre ceux-ci vCPUs par appelchunks, puis chaque sous-liste de formes d'onde est traitée séparément en CPU utilisant la classe Parallel de joblib. Les résultats sont automatiquement combinés dans une liste unique de dataframes par la tâche de traitement, qui poursuit SageMaker ensuite le traitement avant de les écrire dans Amazon S3 une fois la tâche terminée. Dans cet exemple, 10 fichiers ont été écrits sur Amazon S3 par les tâches de traitement (un pour chaque tâche).

Lorsque toutes les tâches de traitement initiales sont terminées, une tâche de traitement secondaire, illustrée dans le bloc à droite du diagramme (3), combine les fichiers de sortie produits par chaque tâche de traitement principale et écrit la sortie combinée sur Amazon S3 (4).

Outils

Outils

  • Python — L'exemple de code utilisé pour ce modèle est Python (version 3).

  • SageMaker Studio — Amazon SageMaker Studio est un environnement de développement intégré basé sur le Web (IDE) pour l'apprentissage automatique qui vous permet de créer, de former, de déboguer, de déployer et de surveiller vos modèles d'apprentissage automatique. Vous exécutez SageMaker des tâches de traitement en utilisant des blocs-notes Jupyter dans Studio. SageMaker

  • SageMaker Traitement — Amazon SageMaker Processing fournit un moyen simplifié d'exécuter vos charges de travail de traitement des données. Dans ce modèle, le code d'ingénierie des fonctionnalités est implémenté à grande échelle à l'aide de tâches SageMaker de traitement.

Code

Le fichier .zip joint fournit le code complet de ce modèle. La section suivante décrit les étapes à suivre pour créer l'architecture de ce modèle. Chaque étape est illustrée par un exemple de code extrait de la pièce jointe.

Épopées

TâcheDescriptionCompétences requises
Accédez à Amazon SageMaker Studio.

Accédez à SageMaker Studio depuis votre AWS compte en suivant les instructions fournies dans la SageMaker documentation Amazon.

Scientifique des données, ingénieur ML
Installez l'utilitaire wget.

Installez wget si vous avez intégré une nouvelle configuration de SageMaker Studio ou si vous n'avez jamais utilisé ces utilitaires dans SageMaker Studio auparavant. 

Pour l'installer, ouvrez une fenêtre de terminal dans la console SageMaker Studio et exécutez la commande suivante :

sudo yum install wget
Scientifique des données, ingénieur ML
Téléchargez et décompressez l'exemple de code.

Téléchargez le attachments.zip fichier dans la section Pièces jointes. Dans une fenêtre de terminal, accédez au dossier dans lequel vous avez téléchargé le fichier et extrayez son contenu :

unzip attachment.zip

Accédez au dossier dans lequel vous avez extrait le fichier .zip et extrayez le contenu du Scaled-Processing.zip fichier.

unzip Scaled-Processing.zip
Scientifique des données, ingénieur ML
Téléchargez l'exemple de jeu de données sur physionet.org et chargez-le sur Amazon S3.

Exécutez le bloc-notes get_data.ipynb Jupyter dans le dossier contenant les Scaled-Processing fichiers. Ce bloc-notes télécharge un exemple MIMIC de III jeu de données depuis physionet.org et le charge dans votre compartiment de session SageMaker Studio dans Amazon S3.

Scientifique des données, ingénieur ML
TâcheDescriptionCompétences requises
Aplatissez la hiérarchie des fichiers dans tous les sous-répertoires.

Dans les ensembles de données volumineux tels que MIMIC -III, les fichiers sont souvent répartis dans plusieurs sous-répertoires, même au sein d'un groupe parent logique. Votre script doit être configuré pour aplatir tous les fichiers de groupe dans tous les sous-répertoires, comme le montre le code suivant.

# Generate list of .dat files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']

Remarque Les exemples d'extraits de code présentés dans cette épopée proviennent du src/feature-engineering-pass1/preprocessing.py fichier fourni en pièce jointe.

Scientifique des données, ingénieur ML
Divisez les fichiers en sous-groupes en fonction du CPU nombre de v.

Les fichiers doivent être divisés en sous-groupes ou segments de taille égale, en fonction du nombre de fichiers vCPUs présents sur l'instance qui exécute le script. Pour cette étape, vous pouvez implémenter un code similaire au suivant.

# Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))
Scientifique des données, ingénieur ML
Parallélisez le traitement des sous-groupes entre eux. vCPUs

La logique du script doit être configurée pour traiter tous les sous-groupes en parallèle. Pour ce faire, utilisez la Parallel  classe et la delayed  méthode de la bibliothèque Joblib comme suit. 

# Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
Scientifique des données, ingénieur ML
Enregistrez la sortie d'un seul groupe de fichiers sur Amazon S3.

Lorsque le CPU traitement en parallel v est terminé, les résultats de chaque v CPU doivent être combinés et téléchargés dans le chemin du compartiment S3 du groupe de fichiers. Pour cette étape, vous pouvez utiliser un code similaire au suivant.

# Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
Scientifique des données, ingénieur ML
TâcheDescriptionCompétences requises
Combinez les fichiers de données produits dans toutes les tâches de traitement qui ont exécuté le premier script.

Le script précédent génère un fichier unique pour chaque tâche de SageMaker traitement qui traite un groupe de fichiers de l'ensemble de données.  Ensuite, vous devez combiner ces fichiers de sortie en un seul objet et écrire un ensemble de données de sortie unique sur Amazon S3. Cela est démontré dans le src/feature-engineering-pass1p5/preprocessing.py fichier, qui est fourni en pièce jointe, comme suit.

def write_parquet(wavs_df, path):     """     Write waveform summary dataframe to S3 in parquet format.          :param wavs_df: waveform summary dataframe     :param path: S3 directory prefix     :type wavs_df: pandas dataframe     :type path: str     :return: None     """     extra_args = {"ServerSideEncryption": "aws:kms"}     wr.s3.to_parquet(         df=wavs_df,         path=path,         compression='snappy',         s3_additional_kwargs=extra_args)     def combine_data():     """     Get combined data and write to parquet.          :return: waveform summary dataframe     :rtype: pandas dataframe     """     wavs_df = get_data()     wavs_df = normalize_signal_names(wavs_df)     write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))       return wavs_df     wavs_df = combine_data()
Scientifique des données, ingénieur ML
TâcheDescriptionCompétences requises
Exécutez la première tâche de traitement.

Pour effectuer le macrosharding, exécutez une tâche de traitement distincte pour chaque groupe de fichiers. Le microsharding est effectué dans chaque tâche de traitement, car chaque tâche exécute votre premier script. Le code suivant montre comment lancer une tâche de traitement pour chaque répertoire de groupe de fichiers dans l'extrait suivant (inclus dansnotebooks/FeatExtract_Pass1.ipynb).

pat_groups = list(range(30,40)) ts = str(int(time.time()))   for group in pat_groups:     sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                      role=role,                                      instance_type='ml.m5.4xlarge',                                      instance_count=1,                                      volume_size_in_gb=5)     sklearn_processor.run(         code='../src/feature-engineering-pass1/preprocessing.py',         job_name='-'.join(['scaled-processing-p1', str(group), ts]),         arguments=[             "input_path", "/opt/ml/processing/input",             "output_path", "/opt/ml/processing/output",             "group_data_out", "ws_df_group.json"         ],         inputs=         [             ProcessingInput(                 source=f's3://{sess.default_bucket()}/data_inputs/{group}',                 destination='/opt/ml/processing/input',                 s3_data_distribution_type='FullyReplicated'             )         ],         outputs=         [             ProcessingOutput(                 source='/opt/ml/processing/output',                 destination=f's3://{sess.default_bucket()}/data_outputs/{group}'             )         ],         wait=False     )
Scientifique des données, ingénieur ML
Exécutez le deuxième travail de traitement.

Pour combiner les sorties générées par le premier ensemble de tâches de traitement et effectuer des calculs supplémentaires pour le prétraitement, vous devez exécuter votre deuxième script à l'aide d'une seule tâche de SageMaker traitement. Le code suivant illustre cela (inclus dansnotebooks/FeatExtract_Pass1p5.ipynb).

ts = str(int(time.time())) bucket = sess.default_bucket()       sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                  role=role,                                  instance_type='ml.t3.2xlarge',                                  instance_count=1,                                  volume_size_in_gb=5) sklearn_processor.run(     code='../src/feature-engineering-pass1p5/preprocessing.py',     job_name='-'.join(['scaled-processing', 'p1p5', ts]),     arguments=['bucket', bucket,                'pass1out_prefix', 'data_outputs',                'pass1out_data', 'ws_df_group.json',                'pass1p5out_data', 'waveform_summary.parquet',                'statsdata_name', 'signal_stats.csv'],     wait=True )
Scientifique des données, ingénieur ML

Ressources connexes

Pièces jointes

Pour accéder au contenu supplémentaire associé à ce document, décompressez le fichier suivant : attachment.zip