使用 SageMaker 處理對 TB 規模的 ML 資料集進行分散式特徵工程 - AWS 方案指引

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 SageMaker 處理對 TB 規模的 ML 資料集進行分散式特徵工程

由 Chris Boomhower 建立 (AWS)

環境:生產

技術:機器學習和 AI;大數據

AWS 服務:Amazon SageMaker

Summary

許多 TB 規模或更大的資料集通常由階層式資料夾結構組成,而且資料集中的檔案有時會共用相互依存性。因此,機器學習 (ML) 工程師和資料科學家必須做出深思熟慮的設計決策,以準備此類資料以進行模型訓練和推論。此模式示範如何結合 Amazon SageMaker Processing 和虛擬 CPU(vCPU) 平行處理,使用手動巨集分割和微分割技術,以有效擴展複雜巨量資料 ML 資料集的特徵工程程序。 

此模式將巨集分割定義為跨多台機器進行處理的資料目錄分割,並將微分割定義為跨多個處理執行緒在每個機器上的資料分割。此模式透過使用 Amazon SageMaker 搭配 PhysioNet MIMIC-III 資料集的範例時間序列波形記錄來示範這些技術。透過在此模式中實作技術,您可以將特徵工程的處理時間和成本降至最低,同時最大化資源使用率和輸送量效率。這些最佳化仰賴 Amazon Elastic Compute Cloud (Amazon EC2) 執行個體上的分散式 SageMaker 處理,以及 vCPUs 類似、大型資料集,無論資料類型為何。

先決條件和限制

先決條件

  • 如果您想要為自己的資料集實作此模式,請存取 SageMaker 筆記本執行個體或 SageMaker Studio。如果您是 SageMaker 第一次使用 Amazon,請參閱 AWS 文件中的開始使用 Amazon SageMaker

  • SageMaker Studio,如果您想要使用 PhysioNet MIMIC範例III資料實作此模式。 

  • 此模式使用 SageMaker 處理,但不需要執行 SageMaker 處理任務的任何經驗。

限制

  • 此模式非常適合包含相互依賴檔案的 ML 資料集。這些相互依存性受益於手動巨集分割,並平行執行多個單一執行個體 SageMaker 處理任務。對於不存在此類相互依存性的資料集, SageMaker 處理中的ShardedByS3Key功能可能是巨集控制更好的替代方案,因為它會將碎片資料傳送至由相同處理任務管理的多個執行個體。不過,您可以在這兩種情況下實作此模式的微分策略,以充分利用執行個體 vCPUs。

產品版本

  • Amazon SageMaker Python 第 2 SDK版

架構

目標技術堆疊

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon SageMaker

目標架構

巨集控制和分散式EC2執行個體

此架構中代表的 10 個平行程序反映 MIMIC-III 資料集的結構。(程序以省略符號表示,以簡化圖表。) 當您使用手動巨集分割時,類似的架構會套用至任何資料集。在 MIMIC- 中III,您可以藉由盡可能分別處理每個病患群組資料夾,來使用資料集的原始結構。在下圖中,記錄群組區塊會出現在左側 (1)。鑑於資料的分散式性質,依病患群組碎片化很合理。

Microsharding 和分散式EC2執行個體的架構

不過,依病患群組手動分割表示每個病患群組資料夾都需要單獨的處理任務,如圖表 (2) 中間區段所示,而不是具有多個EC2執行個體的單一處理任務。由於 MIMICIII的資料包含二進位波形檔案和相符的文字型標頭檔案,而且需要依賴 wfdb 程式庫才能擷取二進位資料,因此特定病患的所有記錄都必須在同一個執行個體上提供。唯一可以確定每個二進位波形檔案的關聯標頭檔案也存在的方法,是實作手動分割,以在自己的處理任務中執行每個碎片,並指定s3_data_distribution_type='FullyReplicated'何時定義處理任務輸入。或者,如果所有資料在單一目錄中可用,而且檔案之間不存在相依性,則更合適的選項可能是啟動具有多個EC2執行個體且s3_data_distribution_type='ShardedByS3Key'指定的單一處理任務。指定ShardedByS3Key 為 Amazon S3 資料分佈類型會指示 SageMaker 自動管理跨執行個體的資料共用。 

為每個資料夾啟動處理任務是預先處理資料的成本效益方法,因為同時執行多個執行個體可節省時間。為了節省額外的成本和時間,您可以在每個處理任務中使用微分割。 

Microsharding 和平行處理 vCPUs

在每個處理任務中,會進一步分割分組的資料,以最大化 SageMaker 完全受管EC2執行個體 vCPUs 上所有可用的使用。圖表 (2) 中間區段中的區塊描述了每個主要處理任務中發生的情況。病患記錄資料夾的內容會根據執行個體 vCPUs 上可用的數量,平均地扁平化和分割。分割資料夾內容後,平均大小的檔案集會分佈到所有 vCPUs 以進行處理。處理完成後,每個 vCPU 的結果會合併為每個處理任務的單一資料檔案。 

在附加的程式碼中,這些概念會在 src/feature-engineering-pass1/preprocessing.py 檔案的下一節中表示。

def chunks(lst, n):     """     Yield successive n-sized chunks from lst.          :param lst: list of elements to be divided     :param n: number of elements per chunk     :type lst: list     :type n: int     :return: generator comprising evenly sized chunks     :rtype: class 'generator'     """     for i in range(0, len(lst), n):         yield lst[i:i + n]     # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']   # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))   # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)   # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))

函數 chunks會先定義為透過將指定清單分割為均勻大小的長度區塊,並將這些結果傳回為產生器來取用指定的清單。接下來,透過編譯所有存在的二進位波形檔案清單,將資料平整化至病患資料夾。完成後,就會取得EC2執行個體上 vCPUs 可用的 數目。二元波形檔案的清單 vCPUs 透過呼叫 等分chunks,然後每個波形子清單都會使用 joblib 的平行類別 自行處理 vCPU。處理任務會自動將結果合併為單一資料架構清單, SageMaker 然後進一步處理,然後在任務完成後寫入 Amazon S3。在此範例中,處理任務寫入 Amazon S3 的檔案有 10 個 (每個任務一個)。

當所有初始處理任務完成時,次要處理任務會顯示在圖表 (3) 右側的區塊中,結合每個主要處理任務產生的輸出檔案,並將合併的輸出寫入 Amazon S3 (4)。

工具

工具

  • Python – 用於此模式的範例程式碼為 Python (第 3 版)。

  • SageMaker Studio – Amazon SageMaker Studio 是適用於機器學習的 Web 型整合式開發環境 (IDE),可讓您建置、訓練、偵錯、部署和監控機器學習模型。您可以在 SageMaker Studio 中使用 Jupyter 筆記本來執行 SageMaker 處理任務。

  • SageMaker 處理 – Amazon SageMaker Processing 提供簡化的資料處理工作負載執行方式。在此模式中,特徵工程程式碼會透過使用 SageMaker 處理任務大規模實作。

Code

附加的 .zip 檔案提供此模式的完整程式碼。下一節說明建置此模式架構的步驟。每個步驟都以來自附件的範例程式碼來說明。

史詩

任務描述所需的技能
存取 Amazon SageMaker Studio。

遵循 Amazon SageMaker 文件 中提供的指示,加入您AWS帳戶中的 SageMaker Studio。

資料科學家、ML 工程師
安裝 wget 公用程式。

如果您使用新的 SageMaker Studio 組態加入,或者您之前從未在 SageMaker Studio 中使用這些公用程式,請安裝 wget。 

若要安裝,請在 SageMaker Studio 主控台中開啟終端機視窗,然後執行下列命令:

sudo yum install wget
資料科學家、ML 工程師
下載並解壓縮範例程式碼。

附件區段中下載 attachments.zip 檔案。在終端機視窗中,導覽至您下載檔案的資料夾,並擷取其內容:

unzip attachment.zip

導覽至您擷取 .zip 檔案的資料夾,並擷取Scaled-Processing.zip檔案的內容。

unzip Scaled-Processing.zip
資料科學家、ML 工程師
從 physionet.org 下載範例資料集,並將其上傳至 Amazon S3。

在包含 Scaled-Processing 檔案的資料夾中執行 get_data.ipynb Jupyter 筆記本。此筆記本會從 IIIphysionet.org 下載範例MIMIC資料集,並將其上傳至 Amazon S3 中的 SageMaker Studio 工作階段儲存貯體。

資料科學家、ML 工程師
任務描述所需的技能
在所有子目錄上扁平化檔案階層。

在 MIMIC- 等大型資料集中III,檔案通常分佈於多個子目錄,即使在邏輯父群組內也是如此。您的指令碼應設定為在所有子目錄扁平化所有群組檔案,如下列程式碼所示。

# Generate list of .dat files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']

請注意,此範例程式碼片段的範例來自 檔案,該src/feature-engineering-pass1/preprocessing.py檔案提供於附件中。

資料科學家、ML 工程師
根據 vCPU 計數將檔案分割為子群組。

根據執行指令碼的執行個體上 vCPUs 存在的數目,檔案應分為大小均勻的子群組或區塊。對於此步驟,您可以實作類似下列的程式碼。

# Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))
資料科學家、ML 工程師
平行處理跨 的子群組vCPUs。

指令碼邏輯應設定為平行處理所有子群組。若要這麼做,請使用 Joblib 程式庫的Parallel 類別和delayed 方法,如下所示。 

# Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
資料科學家、ML 工程師
將單一檔案群組輸出儲存至 Amazon S3。

平行 vCPU 處理完成時,應合併每個 vCPU 的結果,並上傳至檔案群組的 S3 儲存貯體路徑。對於此步驟,您可以使用類似下列的程式碼。

# Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
資料科學家、ML 工程師
任務描述所需的技能
合併執行第一個指令碼的所有處理任務所產生的資料檔案。

先前的指令碼會為每個 SageMaker 處理任務輸出單一檔案,該任務會從資料集處理一組檔案。 接下來,您需要將這些輸出檔案合併為單一物件,並將單一輸出資料集寫入 Amazon S3。這在附件中提供src/feature-engineering-pass1p5/preprocessing.py的檔案中示範,如下所示。

def write_parquet(wavs_df, path):     """     Write waveform summary dataframe to S3 in parquet format.          :param wavs_df: waveform summary dataframe     :param path: S3 directory prefix     :type wavs_df: pandas dataframe     :type path: str     :return: None     """     extra_args = {"ServerSideEncryption": "aws:kms"}     wr.s3.to_parquet(         df=wavs_df,         path=path,         compression='snappy',         s3_additional_kwargs=extra_args)     def combine_data():     """     Get combined data and write to parquet.          :return: waveform summary dataframe     :rtype: pandas dataframe     """     wavs_df = get_data()     wavs_df = normalize_signal_names(wavs_df)     write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))       return wavs_df     wavs_df = combine_data()
資料科學家、ML 工程師
任務描述所需的技能
執行第一個處理任務。

若要執行巨集分割,請為每個檔案群組執行個別的處理任務。Microsharding 會在每個處理任務內執行,因為每個任務都會執行您的第一個指令碼。下列程式碼示範如何在下列程式碼片段 (包含在 中) 中為每個檔案群組目錄啟動處理任務notebooks/FeatExtract_Pass1.ipynb

pat_groups = list(range(30,40)) ts = str(int(time.time()))   for group in pat_groups:     sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                      role=role,                                      instance_type='ml.m5.4xlarge',                                      instance_count=1,                                      volume_size_in_gb=5)     sklearn_processor.run(         code='../src/feature-engineering-pass1/preprocessing.py',         job_name='-'.join(['scaled-processing-p1', str(group), ts]),         arguments=[             "input_path", "/opt/ml/processing/input",             "output_path", "/opt/ml/processing/output",             "group_data_out", "ws_df_group.json"         ],         inputs=         [             ProcessingInput(                 source=f's3://{sess.default_bucket()}/data_inputs/{group}',                 destination='/opt/ml/processing/input',                 s3_data_distribution_type='FullyReplicated'             )         ],         outputs=         [             ProcessingOutput(                 source='/opt/ml/processing/output',                 destination=f's3://{sess.default_bucket()}/data_outputs/{group}'             )         ],         wait=False     )
資料科學家、ML 工程師
執行第二個處理任務。

若要合併第一組處理任務產生的輸出,並執行任何額外的預先處理運算,您可以使用單一 SageMaker 處理任務執行第二個指令碼。下列程式碼示範 (包含在 中notebooks/FeatExtract_Pass1p5.ipynb)。

ts = str(int(time.time())) bucket = sess.default_bucket()       sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                  role=role,                                  instance_type='ml.t3.2xlarge',                                  instance_count=1,                                  volume_size_in_gb=5) sklearn_processor.run(     code='../src/feature-engineering-pass1p5/preprocessing.py',     job_name='-'.join(['scaled-processing', 'p1p5', ts]),     arguments=['bucket', bucket,                'pass1out_prefix', 'data_outputs',                'pass1out_data', 'ws_df_group.json',                'pass1p5out_data', 'waveform_summary.parquet',                'statsdata_name', 'signal_stats.csv'],     wait=True )
資料科學家、ML 工程師

相關資源

附件

若要存取與本文件相關聯的其他內容,請解壓縮下列檔案: attachment.zip