本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 Processing 对 TB 级机器学习 SageMaker 数据集进行分布式特征工程
由 Chris Boomhower 创作 () AWS
摘要
许多 TB 级或更大的数据集通常由分层文件夹结构构成,数据集中的文件有时会共享相互依存关系。因此,机器学习 (ML) 工程师和数据科学家必须做出深思熟虑的决策,为模型训练和推理准备此类数据。此模式演示了如何将手动宏分片和微分片技术与 Amazon Processing 和虚拟 CPU (vCPU) 并行化相结合,为复杂的大数据 ML SageMaker 数据集高效扩展特征工程流程。
这种模式将宏分片定义为在多台计算机上拆分数据目录进行处理,将微分片定义为将每台计算机上的数据分割至多个处理线程中。该模式通过使用 Amazon SageMaker 和数据集中的时间序列波形记录样本来演示这些技术。PhysioNet MIMIC III
先决条件和限制
先决条件
如果您想为自己的数据集实现此模式,则可以访问 SageMaker 笔记本实例或 SageMaker Studio。如果您是首次使用亚马逊 SageMaker ,请参阅AWS文档 SageMaker中的亚马逊入门。
该模式使用 SageMaker 处理,但不需要任何运行 SageMaker 处理作业的经验。
限制
这种模式非常适合包含相互依赖文件的机器学习数据集。手动宏分片和并行运行多个单实例 Processing SageMaker 作业对这些相互依赖关系的益处最大。对于不存在此类相互依赖关系的数据集,Processing 中的
ShardedByS3Key
SageMaker 功能可能是宏分片的更好替代方案,因为它会将分片数据发送到由同一 Processing 作业管理的多个实例。但是,您可以在这两种情况下实现这种模式的微分片策略,以充分利用实例。vCPUs
产品版本
亚马逊 SageMaker Python 第 2 SDK 版
架构
目标技术堆栈
Amazon Simple Storage Service(Amazon S3)
Amazon SageMaker
目标架构
宏分片和分布式实例 EC2
该架构中代表的 10 个并行进程反映了 MIMIC-III 数据集的结构。(为了简化逻辑示意图,流程用省略号表示。) 当您使用手动宏分片时,类似架构适用于任何数据集。如果是 MIMIC-III,您可以毫不费力地单独处理每个患者组文件夹,从而充分利用数据集的原始结构。下图中的记录组块出现在左侧 (1)。鉴于数据分布性质,按患者群体进行分片是有意义的。
但是,按患者组手动分片意味着每个患者组文件夹都需要单独的处理作业,如您在图 (2) 的中间部分所见,而不是具有多个EC2实例的单个处理作业。由于 MIMIC-的数据包括二进制波形文件和匹配III的基于文本的头文件,并且需要依赖 wfdb 库s3_data_distribution_type='FullyReplicated'
何时定义处理作业输入。或者,如果所有数据都位于一个目录中,并且文件之间不存在依赖关系,则更合适的选择可能是启动具有多个EC2实例并s3_data_distribution_type='ShardedByS3Key'
指定的单个处理作业。指定ShardedByS3Key
为 Amazon S3 数据分配类型 SageMaker 可自动管理跨实例的数据分片。
为每个文件夹启动 Processing 作业是经济高效的预处理数据的方式,因为同时运行多个实例可以节省时间。为进一步节省成本和时间,您可以在每个处理作业中使用微分片。
微分片和 parallel vCPUs
在每个 Processing 作业中,对分组的数据进行进一步划分,以最大限度地利用 SageMaker 完全托管EC2实例 vCPUs 上的所有可用数据。图 (2) 中间部分的方块描述了每个主要处理任务中的情况。患者记录文件夹的内容是扁平化的,并根据实例 vCPUs 上的可用数量平均分配。分割文件夹内容后,大小均匀的一组文件将分布在所有文件中 vCPUs 进行处理。处理完成后,每个 v CPU 的结果将合并到每个 Processing 作业的单个数据文件中。
在随附的代码中,这些概念在 src/feature-engineering-pass1/preprocessing.py
文件的下一节中进行了介绍。
def chunks(lst, n): """ Yield successive n-sized chunks from lst. :param lst: list of elements to be divided :param n: number of elements per chunk :type lst: list :type n: int :return: generator comprising evenly sized chunks :rtype: class 'generator' """ for i in range(0, len(lst), n): yield lst[i:i + n] # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs: file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat'] # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits)) # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks) # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
chunks
函数首先被定义为使用给定列表,方法是将其分成大小均匀的长度 n
,然后将这些结果作为生成器返回。接下来,通过编译所有存在的二进制波形文件列表,在患者文件夹中对数据进行扁平化。完成此操作后,将获得EC2实例上的 vCPUs 可用数量。 vCPUs 通过调用将二进制波形文件列表平均分成这些文件chunks
,然后使用 jobli
当所有初始处理任务完成后,辅助处理任务(如图 (3) 右侧的方块所示)将合并每个主处理任务生成的输出文件,并将合并后的输出写入 Amazon S3 (4)。
工具
工具
Python
— 用于此模式的示例代码是 Python(版本 3)。 SageMaker Stud io — Amazon SageMaker Studio 是一个用于机器学习的基于 Web 的集成开发环境 (IDE),允许您构建、训练、调试、部署和监控您的机器学习模型。您可以在 Studio 中 SageMaker 使用 Jupyter 笔记本来运行 SageMaker 处理作业。
SageMaker 处理 — Amazon P SageMaker rocessing 提供了一种运行数据处理工作负载的简化方法。在这种模式中,特征工程代码是通过使用 SageMaker 处理作业大规模实现的。
代码
随附的 .zip 文件提供了此模式的完整代码。以下部分介绍为此模式构建架构的步骤。附件中的示例代码介绍了每个步骤。
操作说明
任务 | 描述 | 所需技能 |
---|---|---|
访问亚马逊 SageMaker 工作室。 | 按照亚马逊 SageMaker 文档中提供的说明使用您的AWS账户登录 SageMaker Studio。 | 数据科学家、机器学习工程师 |
安装 wget 实用程序。 | 如果您已使用新的 SageMaker Studio 配置或以前从未在 Studio 中 SageMaker 使用过这些实用程序,请安装 wg et。 要进行安装,请在 SageMaker Studio 控制台中打开终端窗口并运行以下命令:
| 数据科学家、机器学习工程师 |
下载并解压缩示例代码。 | 在附件部分下载
导航到提取 .zip 文件的文件夹,然后提取
| 数据科学家、机器学习工程师 |
从 physionet.org 下载示例数据集,并将其上传到 Amazon S3。 | 在包含 | 数据科学家、机器学习工程师 |
任务 | 描述 | 所需技能 |
---|---|---|
扁平化所有子目录的文件层次结构。 | 在诸如 MIMIC-之类的大型数据集中III,文件通常分布在多个子目录中,即使在逻辑父组中也是如此。您的脚本应配置为扁平化所有子目录中所有组文件,如以下代码所示。
注意 此长篇故事中的示例代码片段来自该 | 数据科学家、机器学习工程师 |
根据 v CPU 计数将文件划分为子组。 | 应根据运行脚本的实例上 vCPUs 存在的文件数量将文件分成大小相等的子组或块。在此步骤中,可实现类似于以下代码的代码。
| 数据科学家、机器学习工程师 |
并行处理子组。vCPUs | 应将脚本逻辑配置为并行处理所有的子组。为此,请按如下方式使用 Joblib 库 的
| 数据科学家、机器学习工程师 |
将单个文件组的输出保存至 Amazon S3。 | paralle CPU l v 处理完成后,CPU应合并每个 v 的结果并将其上传到文件组的 S3 存储桶路径。在此步骤,可以使用类似于以下代码的代码。
| 数据科学家、机器学习工程师 |
任务 | 描述 | 所需技能 |
---|---|---|
合并运行第一个脚本的所有 Processing 作业所生成的数据文件。 | 前面的脚本为 SageMaker 处理数据集中的一组文件的每个处理作业输出一个文件。 接下来,您需要将这些输出文件合并至一个对象,并将单个输出数据集写入 Amazon S3。文件中对此进行了演示,此
| 数据科学家、机器学习工程师 |
任务 | 描述 | 所需技能 |
---|---|---|
运行第一项处理作业。 | 若要执行宏分片,请为每个文件组运行单独的处理作业。Microsharding 是在每个 Processing 作业中执行,因为每个作业都会运行您的第一个脚本。以下代码演示了如何为以下片段(包含在
| 数据科学家、机器学习工程师 |
运行第二个处理作业。 | 要合并第一组处理作业生成的输出并执行任何其他计算以进行预处理,请使用单 SageMaker 个 Processing 作业运行第二个脚本。以下代码演示了这一点(包含在
| 数据科学家、机器学习工程师 |
相关资源
使用快速入门登录 Amazon SageMaker Studio(SageMaker 文档)
流程数据(SageMaker 文档)
使用 scikit-learn 进行数据处理(文档)SageMaker
Moody, B., Moody, G., Villarroel, M., Clifford, G. D., & Silva, I. (2020). MIMIC-III 波形数据库
(版本 1.0)。 PhysioNet。 Johnson、A.E.W.、Pollard、T.J.、Shen、L.、Lehman、L.、Feng、M.、Ghassemi、M.、Moody、B.、Szolovits、P.、Celi、L.A. 和 Mark,R.G.(2016)。 MIMIC-III,一个可免费访问的重症监护数据库
。Scientific Data, 3, 160035.
附件
要访问与此文档相关联的其他内容,请解压以下文件:attachment.zip