SageMaker 処理を使用してテラバイト規模の ML データセットの分散特徴量エンジニアリングを行う - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SageMaker 処理を使用してテラバイト規模の ML データセットの分散特徴量エンジニアリングを行う

クリス・ブームハワー(AWS)によって作成された

環境:本稼働

テクノロジー:機械学習と AI、ビッグデータ

AWS サービス: Amazon SageMaker

[概要]

テラバイト規模またはそれ以上のデータセットの多くは階層的なフォルダ構造で構成されており、データセット内のファイルは相互に依存している場合があります。このため、機械学習 (ML) エンジニアとデータサイエンティストは、モデルトレーニングや推論のためにデータを準備するために、慎重に設計上の決定を下さなければなりません。このパターンは、手動マクロシャーディングとマイクロシャーディング技術を Amazon SageMaker Processing および仮想 CPU (vCPU) 並列化と組み合わせて、複雑なビッグデータ ML データセットの特徴量エンジニアリングプロセスを効率的にスケーリングする方法を示しています。 

このパターンでは、データディレクトリを複数のマシンに分割して処理することを「マクロシャーディング」と定義し、「マイクロシャーディング」は各マシンのデータを複数の処理スレッドに分割することと定義されています。このパターンは、PhysioNet MIMIC-TAK データセットのサンプル時系列結合レコード SageMaker で Amazon を使用することによって、これらの手法を示しています。このパターンの手法を実装することで、リソース利用率とスループット効率を最大化しながら、特徴量エンジニアリングの処理時間とコストを最小限に抑えることができます。これらの最適化は、データ型に関係なく、類似した大規模なデータセットの Amazon Elastic Compute Cloud (Amazon EC2) インスタンスと vCPUsでの分散 SageMaker 処理に依存しています。

前提条件と制限

前提条件

  • 独自のデータセットにこのパターンを実装する場合は、 SageMaker ノートブックインスタンスまたは SageMaker Studio にアクセスします。Amazon を初めて使用する場合は、AWS ドキュメントの「Amazon の開始 SageMaker方法 SageMaker 」を参照してください。

  • SageMaker Studio、PhysioNet MIMIC-TAK サンプルデータを使用してこのパターンを実装する場合。 

  • このパターンでは SageMaker Processing を使用していますが、 SageMaker Processing ジョブの実行経験は必要ありません。

制約事項

  • このパターンは、相互に依存するファイルを含む ML データセットに非常に適しています。これらの相互依存関係は、手動マクロシャーディングと複数の単一インスタンス SageMaker 処理ジョブを並行して実行することで最大の利点を得られます。このような相互依存関係が存在しないデータセットでは、 SageMaker 同じ処理ジョブによって管理される複数のインスタンスにシャーディングされたデータを送信するため、処理ShardedByS3Keyの機能がマクロシャーディングの代替となる可能性があります。ただし、このパターンのマイクロシャーディング戦略をどちらのシナリオでも実装して、インスタンス vCPUs を最大限に活用できます。

製品バージョン

  • Amazon SageMaker Python SDK バージョン 2

アーキテクチャ

ターゲットテクノロジースタック

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon SageMaker

ターゲット アーキテクチャ

マクロシャーディングと分散型 EC2 インスタンス

このアーキテクチャで表される 10 個のparallel プロセスは、MIMIC-III データセットの構造を反映しています。(図を簡略化するため、プロセスは楕円で示されています)。手動マクロシャーディングを使用する場合も、同様のアーキテクチャがどのデータセットにも適用されます。MIMIC-III の場合、各患者グループフォルダーを最小の労力で個別に処理することで、データセットの未加工の構造を活用できます。以下の図では、レコードグループブロックが左側に表示されています (1)。データが分散されていることを考えると、患者グループごとにシャードするのは理にかなっています。

マイクロシャーディングと分散 EC2 インスタンスのアーキテクチャ

ただし、患者グループごとに手動でシャーディングを行うと、図 (2) の中央のセクションでわかるように、複数の EC2 インスタンスによる単一の処理ジョブではなく、患者グループフォルダごとに個別の処理ジョブが必要になります。MIMIC-III のデータには、バイナリ波形ファイルと対応するテキストベースのヘッダーファイルの両方が含まれており、バイナリデータ抽出には「wfdb ライブラリ」に依存する必要があるため、特定の患者のすべてのレコードを同じインスタンスで利用できるようにする必要があります。各バイナリ波形ファイルの関連ヘッダーファイルも確実に存在させる唯一の方法は、各シャードを独自の処理ジョブ内で実行する手動シャーディングを実装し、処理ジョブの入力を定義するときにs3_data_distribution_type='FullyReplicated'を指定することです。あるいは、すべてのデータが単一のディレクトリにあり、ファイル間に依存関係がない場合、より適切なオプションは、複数の EC2 インスタンスとs3_data_distribution_type='ShardedByS3Key'を指定して単一の処理ジョブを起動することかもしれません。Amazon S3 データ分散タイプShardedByS3Key として を指定すると、 はインスタンス間でデータシャーディングを自動的に管理 SageMaker するように に指示します。 

複数のインスタンスを同時に実行すると時間を節約できるため、データを前処理するにはフォルダごとに処理ジョブを起動するのがコスト効率の高い方法です。コストと時間をさらに節約するために、各処理ジョブ内でマイクロシャーディングを使用することもできます。 

マイクロシャーディングとparallel vCPUs

各処理ジョブ内では、グループ化されたデータをさらに分割して、 SageMaker フルマネージド EC2 インスタンスで使用可能なすべての vCPUsを最大限に活用します。図の中央のセクション (2) のブロックは、各主要処理ジョブ内で何が起こるかを表しています。患者記録フォルダーの内容は、インスタンスで使用可能なvCPUs の数に基づいてフラット化され、均等に分割されます。フォルダの内容が分割されると、同じサイズのファイルセットがすべての vCPUs に分散されて処理されます。処理が完了すると、各 vCPU の結果は、処理ジョブごとに 1 つのデータファイルにまとめられます。 

添付のコードでは、これらの概念がsrc/feature-engineering-pass1/preprocessing.pyファイルの次のセクションに示されています。

def chunks(lst, n):     """     Yield successive n-sized chunks from lst.          :param lst: list of elements to be divided     :param n: number of elements per chunk     :type lst: list     :type n: int     :return: generator comprising evenly sized chunks     :rtype: class 'generator'     """     for i in range(0, len(lst), n):         yield lst[i:i + n]     # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']   # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))   # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)   # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))

最初に関数chunksを定義し、与えられたリストをの長さの均一な大きさのブロックに分割し、その結果をジェネレータとして返すことにより、与えられたリストを使用します。次に、存在するすべてのバイナリ波形ファイルのリストをコンパイルして、データを患者フォルダー全体でフラット化します。これが完了すると、EC2 インスタンスで使用可能な vCPUs の数が取得されます。バイナリ波形ファイルのリストは、chunksを呼び出すことによってこれらのvCPUに均等に分割され、その後、「joblibのParallelクラス」を使用することによって、各波形サブリストがそれぞれのvCPUで処理されます。結果は、処理ジョブによってデータフレームの単一のリストに自動的に結合 SageMaker され、ジョブの完了時に Amazon S3 に書き込まれる前にさらに処理されます。この例では、処理ジョブによって Amazon S3 に書き込まれるファイルが 10 個あります (ジョブごとに 1 つ)。

最初の処理ジョブがすべて完了すると、図 (3) の右側のブロックに示されているセカンダリ処理ジョブが、各プライマリ処理ジョブによって生成された出力ファイルを結合し、結合された出力を Amazon S3 (4) に書き込みます。

ツール

ツール

  • Python — このパターンに使用されるサンプルコードは Python (バージョン 3) です。

  • SageMaker Studio – Amazon SageMaker Studio は、機械学習モデルの構築、トレーニング、デバッグ、デプロイ、モニタリングを可能にする、機械学習用のウェブベースの統合開発環境 (IDE) です。 SageMaker Studio 内で Jupyter Notebook を使用して SageMaker 処理ジョブを実行します。

  • SageMaker 処理 – Amazon SageMaker Processing は、データ処理ワークロードを簡単に実行する方法を提供します。このパターンでは、特徴量エンジニアリングコードは SageMaker 処理ジョブを使用して大規模に実装されます。

Code

添付の.zip ファイルには、このパターンの完全なコードが記載されています。次のセクションでは、このパターンのアーキテクチャを構築する手順について説明します。各ステップは、添付ファイルのサンプルコードで説明されています。

エピック

タスク説明必要なスキル
Amazon SageMaker Studio にアクセスします。

Amazon SageMaker ドキュメント に記載されている手順に従って、AWS アカウントの SageMaker Studio にオンボードします。

データサイエンティスト、ML エンジニア
wget ユーティリティをインストールします。

新しい SageMaker Studio 設定でオンボーディングした場合、または Studio でこれらのユーティリティを使用したことがない場合は、wget SageMaker をインストールします。 

インストールするには、 SageMaker Studio コンソールでターミナルウィンドウを開き、次のコマンドを実行します。

sudo yum install wget
データサイエンティスト、ML エンジニア
サンプルコードをダウンロードして解凍します。

添付ファイル」セクションからattachments.zipファイルをダウンロードします。ターミナルウィンドウで、ファイルをダウンロードしたフォルダに移動し、その内容を抽出します。

unzip attachment.zip

Scaled-Processing.zipファイルを抽出したフォルダに移動し、ファイルの内容を抽出します。

unzip Scaled-Processing.zip
データサイエンティスト、ML エンジニア
physionet.org からサンプルデータセットをダウンロードし、Amazon S3 にアップロードします。

Scaled-Processingファイルが含まれているフォルダ内で get_data.ipynb Jupyter Notebookを実行します。このノートブックでは、physionet.org からサンプル MIMIC-TAK データセットをダウンロードし、Amazon S3 の SageMaker Studio セッションバケットにアップロードします。

データサイエンティスト、ML エンジニア
タスク説明必要なスキル
すべてのサブディレクトリにわたってファイル階層をフラット化する。

MIMIC-III のような大規模なデータセットでは、論理的な親グループ内であってもファイルが複数のサブディレクトリに分散されることがよくあります。次のコードが示すように、スクリプトはすべてのサブディレクトリにあるすべてのグループファイルをフラット化するように設定する必要があります。

# Generate list of .dat files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']

注:このエピックのサンプルコードスニペットは、添付ファイルにあるsrc/feature-engineering-pass1/preprocessing.pyファイルからのものです。

データサイエンティスト、ML エンジニア
vCPU 数に基づいてファイルをサブグループに分割します。

ファイルは、スクリプトを実行するインスタンスに存在するvCPUs の数に応じて、同じサイズのサブグループまたはチャンクに分割する必要があります。このステップでは、次のようなコードを実装できます。

# Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))
データサイエンティスト、ML エンジニア
vCPUs 間のサブグループの処理を並列化します。

スクリプトロジックは、すべてのサブグループを並行して処理するように設定する必要があります。そのためには、Joblib ライブラリのParallel クラスとdelayed メソッドを次のように使用します。 

# Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
データサイエンティスト、ML エンジニア
Amazon S3 に単一ファイルグループの出力を保存します。

並列 vCPU 処理が完了したら、各 vCPU の結果を組み合わせて、ファイルグループの S3 バケットパスにアップロードする必要があります。このステップでは、次のようなコードを実行できます。

# Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
データサイエンティスト、ML エンジニア
タスク説明必要なスキル
最初のスクリプトを実行したすべての処理ジョブで生成されたデータファイルを結合します。

前のスクリプトは、データセットのファイルのグループを処理する SageMaker 処理ジョブごとに 1 つのファイルを出力します。 次に、これらの出力ファイルを 1 つのオブジェクトに結合し、1 つの出力データセットを Amazon S3 に書き込む必要があります。これは、添付ファイルにあるsrc/feature-engineering-pass1p5/preprocessing.pyファイルに次のように示されています。

def write_parquet(wavs_df, path):     """     Write waveform summary dataframe to S3 in parquet format.          :param wavs_df: waveform summary dataframe     :param path: S3 directory prefix     :type wavs_df: pandas dataframe     :type path: str     :return: None     """     extra_args = {"ServerSideEncryption": "aws:kms"}     wr.s3.to_parquet(         df=wavs_df,         path=path,         compression='snappy',         s3_additional_kwargs=extra_args)     def combine_data():     """     Get combined data and write to parquet.          :return: waveform summary dataframe     :rtype: pandas dataframe     """     wavs_df = get_data()     wavs_df = normalize_signal_names(wavs_df)     write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))       return wavs_df     wavs_df = combine_data()
データサイエンティスト、ML エンジニア
タスク説明必要なスキル
最初の処理ジョブを実行します。

マクロシャーディングを実行するには、ファイルグループごとに個別の処理ジョブを実行します。マイクロシャーディングは各処理ジョブ内で実行されます。これは、各ジョブで最初のスクリプトが実行されるためです。次のコードは、次のスニペット (notebooks/FeatExtract_Pass1.ipynbに含まれている) 内の各ファイルグループディレクトリの処理ジョブを起動する方法を示しています。

pat_groups = list(range(30,40)) ts = str(int(time.time()))   for group in pat_groups:     sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                      role=role,                                      instance_type='ml.m5.4xlarge',                                      instance_count=1,                                      volume_size_in_gb=5)     sklearn_processor.run(         code='../src/feature-engineering-pass1/preprocessing.py',         job_name='-'.join(['scaled-processing-p1', str(group), ts]),         arguments=[             "input_path", "/opt/ml/processing/input",             "output_path", "/opt/ml/processing/output",             "group_data_out", "ws_df_group.json"         ],         inputs=         [             ProcessingInput(                 source=f's3://{sess.default_bucket()}/data_inputs/{group}',                 destination='/opt/ml/processing/input',                 s3_data_distribution_type='FullyReplicated'             )         ],         outputs=         [             ProcessingOutput(                 source='/opt/ml/processing/output',                 destination=f's3://{sess.default_bucket()}/data_outputs/{group}'             )         ],         wait=False     )
データサイエンティスト、ML エンジニア
2 つ目の処理ジョブを実行します。

最初の処理ジョブのセットによって生成された出力を結合し、前処理のために追加の計算を実行するには、1 つの SageMaker 処理ジョブを使用して 2 番目のスクリプトを実行します。次のコードはこれを示しています (notebooks/FeatExtract_Pass1p5.ipynbに含まれている)。

ts = str(int(time.time())) bucket = sess.default_bucket()       sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                  role=role,                                  instance_type='ml.t3.2xlarge',                                  instance_count=1,                                  volume_size_in_gb=5) sklearn_processor.run(     code='../src/feature-engineering-pass1p5/preprocessing.py',     job_name='-'.join(['scaled-processing', 'p1p5', ts]),     arguments=['bucket', bucket,                'pass1out_prefix', 'data_outputs',                'pass1out_data', 'ws_df_group.json',                'pass1p5out_data', 'waveform_summary.parquet',                'statsdata_name', 'signal_stats.csv'],     wait=True )
データサイエンティスト、ML エンジニア

関連リソース

添付ファイル

このドキュメントに関連する追加コンテンツにアクセスするには、次のファイルを解凍してください。「attachment.zip