使用 Processing 对 TB 级机器学习 SageMaker 数据集进行分布式特征工程 - AWS Prescriptive Guidance

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Processing 对 TB 级机器学习 SageMaker 数据集进行分布式特征工程

由 Chris Boomhower (AWS) 编写

环境:生产

技术:机器学习和人工智能;大数据

AWS 服务:亚马逊 SageMaker

总结

许多 TB 级或更大的数据集通常由分层文件夹结构构成,数据集中的文件有时会共享相互依存关系。因此,机器学习 (ML) 工程师和数据科学家必须做出深思熟虑的决策,为模型训练和推理准备此类数据。此模式演示了如何将手动宏分片和微分片技术与 Amazon Processing 和虚拟 CPU (vCPU) 并行化相结合,为复杂的大数据 ML 数据集高效扩展功能工程 SageMaker 流程。 

这种模式将宏分片定义为在多台计算机上拆分数据目录进行处理,将微分片定义为将每台计算机上的数据分割至多个处理线程中。该模式通过使用 Amazon 和 PhysioNet MIM IC-II SageMaker I 数据集中的时间序列波形记录样本来演示这些技术。通过采用这种模式技术,您可以最大限度地减少特征工程的处理时间和成本,同时最大限度地提高资源利用率和吞吐量效率。无论数据类型如何,这些优化都依赖于亚马逊弹性计算云 (Amazon EC2) 实例和 vCPU 上的分布式 SageMaker 处理,以处理类似的大型数据集。

先决条件和限制

先决条件

  • 如果您想为自己的数据集实现此模式,则可以访问 SageMaker 笔记本实例或 SageMaker Studio。如果您是首次使用亚马逊 SageMaker ,请参阅 AWS 文档 SageMaker中的亚马逊入门

  • SageMaker Studio,如果你想用 PhysioNet MIMIC-III 样本数据实现这种模式。 

  • 该模式使用 SageMaker 处理,但不需要任何运行 SageMaker 处理作业的经验。

限制

  • 这种模式非常适合包含相互依赖文件的机器学习数据集。手动宏分片和并行运行多个单实例 Processing SageMaker 作业对这些相互依赖关系的益处最大。对于不存在此类相互依赖关系的数据集,Processing 中的ShardedByS3Key SageMaker 功能可能是宏分片的更好替代方案,因为它会将分片数据发送到由同一 Processing 作业管理的多个实例。但是,您可在这两种情况下实现这种模式的微分片策略,以最好地利用实例 vCPU。

产品版本

  • 亚马逊 SageMaker Python 软件开发工具包版本 2

架构

目标技术堆栈

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon SageMaker

目标架构

宏分片与分布式 EC2 实例

该架构中代表的 10 个并行进程反映了 MIMIC-III 数据集结构。(为了简化逻辑示意图,流程用省略号表示。) 当您使用手动宏分片时,类似架构适用于任何数据集。就 MIMIC-III 而言,您可以毫不费力地单独处理每个患者组文件夹,从而充分利用数据集的原始结构。下图中的记录组块出现在左侧 (1)。鉴于数据分布性质,按患者群体进行分片是有意义的。

微分片和分布式 EC2 实例架构

但是,按患者组手动分片意味着每个患者组文件夹都需要单独的处理任务,如您在图 (2) 中间部分所见,而不是具有多个 EC2 实例的单个处理任务。由于 MIMIC-III 的数据包括二进制波形文件和匹配的基于文本的头文件,并且需要依赖 wfdb 库提取二进制数据,因此特定患者的所有记录都必须在同一个实例上可用。要确保每个二进制波形文件的关联头文件也存在,唯一的方法是实现手动分片,以在自己的处理作业中运行每个分片,并指定s3_data_distribution_type='FullyReplicated'何时定义处理作业输入。或者,如果所有数据位于一个目录中,并且文件之间不存在依赖项,则更合适的选择可能是启动一个包含多个指定 EC2 实例和s3_data_distribution_type='ShardedByS3Key'以发起单处理作业。指定ShardedByS3Key 为 Amazon S3 数据分配类型 SageMaker 可自动管理跨实例的数据分片。 

为每个文件夹启动 Processing 作业是经济高效的预处理数据的方式,因为同时运行多个实例可以节省时间。为进一步节省成本和时间,您可以在每个处理作业中使用微分片。 

微分片与并行 vCPU

在每个处理任务中,对分组的数据进行进一步划分,以最大限度地利用完全托管的 EC2 实例上的 SageMaker 所有可用 vCPU。图 (2) 中间部分的方块描述了每个主要处理任务中的情况。患者记录文件夹内容是扁平化的,并根据实例上可用 vCPU 的数量进行平均分配。分割文件夹内容后,大小均匀的文件将分布在所有 VCPU 上进行处理。处理完成后,每个 vCPU 结果将合并到每个处理作业的单个数据文件中。 

在随附的代码中,这些概念在src/feature-engineering-pass1/preprocessing.py 文件的下一节中进行了介绍。

def chunks(lst, n):     """     Yield successive n-sized chunks from lst.          :param lst: list of elements to be divided     :param n: number of elements per chunk     :type lst: list     :type n: int     :return: generator comprising evenly sized chunks     :rtype: class 'generator'     """     for i in range(0, len(lst), n):         yield lst[i:i + n]     # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']   # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))   # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)   # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))

chunks函数首先被定义为使用给定列表,方法是将其分成大小均匀的长度,然后将这些结果作为生成器返回。接下来,通过编译所有存在的二进制波形文件列表,在患者文件夹中对数据进行扁平化。完成此操作后,将获取 EC2 实例的可用 vCPU 数量。通过调用chunks将二进制波形文件列表平均分配到这些 vCPU 上,然后使用 joblib 的 Parallel 类在自己的 vCPU 上处理每个波形子列表。处理任务会自动将结果合并到一个数据帧列表中,然后处理任务会进一步处理, SageMaker 然后在任务完成后将其写入 Amazon S3。在此示例中,处理任务有 10 个文件写入 Amazon S3 (每个任务一个)。

当所有初始处理任务完成后,辅助处理任务 (如图 (3) 右侧的方块所示)将合并每个主处理任务生成的输出文件,并将合并后的输出写入 Amazon S3 (4)。

工具

工具

  • Python — 用于此模式的示例代码是 Python (版本 3)。

  • SageMaker Stud io — Amazon SageMaker Studio 是一个基于 Web 的机器学习集成开发环境 (IDE),允许您构建、训练、调试、部署和监控您的机器学习模型。您可以在 Studio 中 SageMaker 使用 Jupyter 笔记本来运行 SageMaker 处理作业。

  • SageMaker 处理 — Amazon P SageMaker rocessing 提供了一种运行数据处理工作负载的简化方法。在这种模式中,特征工程代码是通过使用 SageMaker 处理作业大规模实现的。

代码

随附的 .zip 文件提供了此模式的完整代码。以下部分介绍为此模式构建架构的步骤。附件中的示例代码介绍了每个步骤。

操作说明

任务描述所需技能
访问亚马逊 SageMaker 工作室。

按照亚马逊 SageMaker 文档中提供的说明使用您的 AWS 账户登录 SageMaker Studio。

数据科学家、机器学习工程师
安装 wget 实用程序。

如果您已使用新的 SageMaker Studio 配置或以前从未在 Studio 中 SageMaker 使用过这些实用程序,请安装 wg et。 

要进行安装,请在 SageMaker Studio 控制台中打开终端窗口并运行以下命令:

sudo yum install wget
数据科学家、机器学习工程师
下载并解压缩示例代码。

附件部分下载attachments.zip文件。在终端窗口,导航至下载文件并提取其内容的文件夹:

unzip attachment.zip

导航到提取 .zip 文件的文件夹,然后提取 Scaled-Processing.zip文件的内容。

unzip Scaled-Processing.zip
数据科学家、机器学习工程师
从 physionet.org 下载示例数据集,并将其上传到 Amazon S3。

在包含get_data.ipynb文件的文件夹中运行 Jupyter 笔记本Scaled-Processing。此笔记本从 p hysionet.org 下载示例 MIMIC-III 数据集,然后将其上传到亚马逊 S3 中的 Studio 会话存储桶 SageMaker 。

数据科学家、机器学习工程师
任务描述所需技能
扁平化所有子目录的文件层次结构。

在 MIMIC-III 等大型数据集,文件通常分布在多个子目录中,即使在逻辑父组中也是如此。您的脚本应配置为扁平化所有子目录中所有组文件,如以下代码所示。

# Generate list of .dat files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']

注意 此操作说明中的示例代码片段来自src/feature-engineering-pass1/preprocessing.py文件,该文件在附件中提供。

数据科学家、机器学习工程师
根据 vCPU 数量将文件划分至子组。

应根据运行脚本实例上存在的 vCPU 数量,将文件分成大小相等的子组或块。在此步骤中,可实现类似于以下代码的代码。

# Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))
数据科学家、机器学习工程师
在 vCPU 并行处理子组。

应将脚本逻辑配置为并行处理所有的子组。为此,请按如下方式使用 Joblib 库 的Parallel  类和 delayed 方法。 

# Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
数据科学家、机器学习工程师
将单个文件组的输出保存至 Amazon S3。

并行 vCPU 处理完成后,应合并每个 vCPU 的结果,并将其上传到文件组的 S3 存储桶路径。在此步骤,可以使用类似于以下代码的代码。

# Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
数据科学家、机器学习工程师
任务描述所需技能
合并运行第一个脚本的所有 Processing 作业所生成的数据文件。

前面的脚本为每个 SageMaker 处理作业输出一个文件,该作业处理数据集中的一组文件。 接下来,您需要将这些输出文件合并至一个对象,并将单个输出数据集写入 Amazon S3。文件中对此进行了演示,此src/feature-engineering-pass1p5/preprocessing.py文件载于附件,如下所示。

def write_parquet(wavs_df, path):     """     Write waveform summary dataframe to S3 in parquet format.          :param wavs_df: waveform summary dataframe     :param path: S3 directory prefix     :type wavs_df: pandas dataframe     :type path: str     :return: None     """     extra_args = {"ServerSideEncryption": "aws:kms"}     wr.s3.to_parquet(         df=wavs_df,         path=path,         compression='snappy',         s3_additional_kwargs=extra_args)     def combine_data():     """     Get combined data and write to parquet.          :return: waveform summary dataframe     :rtype: pandas dataframe     """     wavs_df = get_data()     wavs_df = normalize_signal_names(wavs_df)     write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))       return wavs_df     wavs_df = combine_data()
数据科学家、机器学习工程师
任务描述所需技能
运行第一项处理作业。

若要执行宏分片,请为每个文件组运行单独的处理作业。Microsharding 是在每个 Processing 作业中执行,因为每个作业都会运行您的第一个脚本。以下代码演示了如何为以下片段 (包含在notebooks/FeatExtract_Pass1.ipynb) 中的每个文件组目录启动处理作业。

pat_groups = list(range(30,40)) ts = str(int(time.time()))   for group in pat_groups:     sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                      role=role,                                      instance_type='ml.m5.4xlarge',                                      instance_count=1,                                      volume_size_in_gb=5)     sklearn_processor.run(         code='../src/feature-engineering-pass1/preprocessing.py',         job_name='-'.join(['scaled-processing-p1', str(group), ts]),         arguments=[             "input_path", "/opt/ml/processing/input",             "output_path", "/opt/ml/processing/output",             "group_data_out", "ws_df_group.json"         ],         inputs=         [             ProcessingInput(                 source=f's3://{sess.default_bucket()}/data_inputs/{group}',                 destination='/opt/ml/processing/input',                 s3_data_distribution_type='FullyReplicated'             )         ],         outputs=         [             ProcessingOutput(                 source='/opt/ml/processing/output',                 destination=f's3://{sess.default_bucket()}/data_outputs/{group}'             )         ],         wait=False     )
数据科学家、机器学习工程师
运行第二个处理作业。

要合并第一组处理作业生成的输出并执行任何其他计算以进行预处理,请使用单 SageMaker 个 Processing 作业运行第二个脚本。以下代码演示了这一点 (包含在notebooks/FeatExtract_Pass1p5.ipynb)。

ts = str(int(time.time())) bucket = sess.default_bucket()       sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                  role=role,                                  instance_type='ml.t3.2xlarge',                                  instance_count=1,                                  volume_size_in_gb=5) sklearn_processor.run(     code='../src/feature-engineering-pass1p5/preprocessing.py',     job_name='-'.join(['scaled-processing', 'p1p5', ts]),     arguments=['bucket', bucket,                'pass1out_prefix', 'data_outputs',                'pass1out_data', 'ws_df_group.json',                'pass1p5out_data', 'waveform_summary.parquet',                'statsdata_name', 'signal_stats.csv'],     wait=True )
数据科学家、机器学习工程师

相关资源

附件

要访问与此文档相关联的其他内容,请解压以下文件:attachment.zip