Use SageMaker Processing for distributed feature engineering of terabyte-scale ML datasets
Created by Chris Boomhower (AWS)
Summary
Many terabyte-scale or larger datasets often consist of a hierarchical folder structure, and the files in the dataset sometimes share interdependencies. For this reason, machine learning (ML) engineers and data scientists must make thoughtful design decisions to prepare such data for model training and inference. This pattern demonstrates how you can use manual macrosharding and microsharding techniques in combination with Amazon SageMaker Processing and virtual CPU (vCPU) parallelization to efficiently scale feature engineering processes for complicated big data ML datasets.
This pattern defines macrosharding as the splitting of data directories across multiple machines for processing, and microsharding as the splitting of data on each machine across multiple processing threads. The pattern demonstrates these techniques by using Amazon SageMaker with sample time-series waveform records from the PhysioNet MIMIC-III
Prerequisites and limitations
Prerequisites
Access to SageMaker notebook instances or SageMaker Studio, if you want to implement this pattern for your own dataset. If you are using Amazon SageMaker for the first time, see Get started with Amazon SageMaker in the AWS documentation.
SageMaker Studio, if you want to implement this pattern with the PhysioNet MIMIC-III
sample data. The pattern uses SageMaker Processing, but doesn’t require any experience running SageMaker Processing jobs.
Limitations
This pattern is well suited to ML datasets that include interdependent files. These interdependencies benefit the most from manual macrosharding and running multiple, single-instance SageMaker Processing jobs in parallel. For datasets where such interdependencies do not exist, the
ShardedByS3Key
feature in SageMaker Processing might be a better alternative to macrosharding, because it sends sharded data to multiple instances that are managed by the same Processing job. However, you can implement this pattern’s microsharding strategy in both scenarios to best utilize instance vCPUs.
Product versions
Amazon SageMaker Python SDK version 2
Architecture
Target technology stack
Amazon Simple Storage Service (Amazon S3)
Amazon SageMaker
Target architecture
Macrosharding and distributed EC2 instances
The 10 parallel processes represented in this architecture reflect the structure of the MIMIC-III dataset. (Processes are represented by ellipses for diagram simplification.) A similar architecture applies to any dataset when you use manual macrosharding. In the case of MIMIC-III, you can use the dataset's raw structure to your advantage by processing each patient group folder separately, with minimal effort. In the following diagram, the record groups block appears on the left (1). Given the distributed nature of the data, it makes sense to shard by patient group.
However, manually sharding by patient group means that a separate Processing job is required for each patient group folder, as you can see in the middle section of the diagram (2), instead of a single Processing job with multiple EC2 instances. Because MIMIC-III's data includes both binary waveform files and matching text-based header files, and there is a required dependency on the wfdb librarys3_data_distribution_type='FullyReplicated'
when you define the Processing job input. Alternatively, if all data were available in a single directory and no dependencies existed between files, a more suitable option might be to launch a single Processing job with multiple EC2 instances and s3_data_distribution_type='ShardedByS3Key'
specified. Specifying ShardedByS3Key
as the Amazon S3 data distribution type directs SageMaker to manage data sharding automatically across instances.
Launching a Processing job for each folder is a cost-efficient way to preprocess the data, because running multiple instances concurrently saves time. For additional cost and time savings, you can use microsharding within each Processing job.
Microsharding and parallel vCPUs
Within each Processing job, the grouped data is further divided to maximize use of all available vCPUs on the SageMaker fully managed EC2 instance. The blocks in the middle section of the diagram (2) depict what happens within each primary Processing job. The contents of the patient record folders are flattened and divided evenly based on the number of available vCPUs on the instance. After the folder contents are divided, the evenly sized set of files are distributed across all vCPUs for processing. When processing is complete, the results from each vCPU are combined into a single data file for each Processing job.
In the attached code, these concepts are represented in the following section of the src/feature-engineering-pass1/preprocessing.py
file.
def chunks(lst, n): """ Yield successive n-sized chunks from lst. :param lst: list of elements to be divided :param n: number of elements per chunk :type lst: list :type n: int :return: generator comprising evenly sized chunks :rtype: class 'generator' """ for i in range(0, len(lst), n): yield lst[i:i + n] # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs: file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat'] # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits)) # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks) # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
A function, chunks
, is first defined to consume a given list by dividing it into evenly sized chunks of length n
and by returning these results as a generator. Next, the data is flattened across patient folders by compiling a list of all binary waveform files that are present. After this is done, the number of vCPUs available on the EC2 instance is obtained. The list of binary waveform files is evenly divided across these vCPUs by calling chunks
, and then each waveform sublist is processed on its own vCPU by using joblib's Parallel class
When all the initial Processing jobs are complete, a secondary Processing job, which is shown in the block to the right of the diagram (3) combines the output files produced by each primary Processing job and writes the combined output to Amazon S3 (4).
Tools
Tools
Python
– The sample code used for this pattern is Python (version 3). SageMaker Studio – Amazon SageMaker Studio is a web-based, integrated development environment (IDE) for machine learning that lets you build, train, debug, deploy, and monitor your machine learning models. You run SageMaker Processing jobs by using Jupyter notebooks inside SageMaker Studio.
SageMaker Processing – Amazon SageMaker Processing provides a simplified way to run your data processing workloads. In this pattern, the feature engineering code is implemented at scale by using SageMaker Processing jobs.
Code
The attached .zip file provides the complete code for this pattern. The following section describes the steps to build the architecture for this pattern. Each step is illustrated by sample code from the attachment.
Epics
Task | Description | Skills required |
---|---|---|
Access Amazon SageMaker Studio. | Onboard to SageMaker Studio in your AWS account by following the directions provided in the Amazon SageMaker documentation. | Data scientist, ML engineer |
Install the wget utility. | Install wget if you onboarded with a new SageMaker Studio configuration or if you've never used these utilities in SageMaker Studio before. To install, open a terminal window in the SageMaker Studio console and run the following command:
| Data scientist, ML engineer |
Download and unzip the sample code. | Download the
Navigate to the folder where you extracted the .zip file, and extract the contents of the
| Data scientist, ML engineer |
Download the sample dataset from physionet.org and upload it to Amazon S3. | Run the | Data scientist, ML engineer |
Task | Description | Skills required |
---|---|---|
Flatten the file hierarchy across all subdirectories. | In large datasets such as MIMIC-III, files are often distributed across multiple subdirectories even within a logical parent group. Your script should be configured to flatten all group files across all subdirectories, as the following code demonstrates.
Note The example code snippets in this epic are from the | Data scientist, ML engineer |
Divide files into subgroups based on vCPU count. | Files should be divided into evenly sized subgroups, or chunks, depending on the number of vCPUs present on the instance that runs the script. For this step, you can implement code similar to the following.
| Data scientist, ML engineer |
Parallelize processing of subgroups across vCPUs. | Script logic should be configured to process all subgroups in parallel. To do this, use the Joblib library's
| Data scientist, ML engineer |
Save single file group output to Amazon S3. | When parallel vCPU processing is complete, the results from each vCPU should be combined and uploaded to the file group's S3 bucket path. For this step, you can use code similar to the following.
| Data scientist, ML engineer |
Task | Description | Skills required |
---|---|---|
Combine data files produced across all Processing jobs that ran the first script. | The previous script outputs a single file for each SageMaker Processing job that processes a group of files from the dataset. Next, you need to combine these output files into a single object and write a single output dataset to Amazon S3. This is demonstrated in the
| Data scientist, ML engineer |
Task | Description | Skills required |
---|---|---|
Run the first Processing job. | To perform macrosharding, run a separate Processing job for each file group. Microsharding is performed inside each Processing job, because each job runs your first script. The following code demonstrates how to launch a Processing job for each file group directory in the following snippet (included in
| Data scientist, ML engineer |
Run the second Processing job. | To combine the outputs generated by the first set of processing jobs and perform any additional computations for preprocessing, you run your second script by using a single SageMaker Processing job. The following code demonstrates this (included in
| Data scientist, ML engineer |
Related resources
Onboard to Amazon SageMaker Studio Using Quick Start (SageMaker documentation)
Process Data (SageMaker documentation)
Data Processing with scikit-learn (SageMaker documentation)
Moody, B., Moody, G., Villarroel, M., Clifford, G. D., & Silva, I. (2020). MIMIC-III Waveform Database
(version 1.0). PhysioNet. Johnson, A. E. W., Pollard, T. J., Shen, L., Lehman, L. H., Feng, M., Ghassemi, M., Moody, B., Szolovits, P., Celi, L. A., & Mark, R. G. (2016). MIMIC-III, a freely accessible critical care database
. Scientific Data, 3, 160035.
Attachments
To access additional content that is associated with this document, unzip the following file: attachment.zip