Use o SageMaker processamento para engenharia de recursos distribuídos de conjuntos de dados de ML em escala de terabytes - Recomendações da AWS

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Use o SageMaker processamento para engenharia de recursos distribuídos de conjuntos de dados de ML em escala de terabytes

Criado por Chris Boomhower (AWS)

Ambiente: produção

Tecnologias: machine learning e IA; big data

Serviços da AWS: Amazon SageMaker

Resumo

Muitos conjuntos de dados em escala de terabytes ou maiores geralmente consistem em uma estrutura hierárquica de pastas e os arquivos no conjunto de dados às vezes compartilham interdependências. Por esse motivo, engenheiros de machine learning (ML) e cientistas de dados devem tomar decisões de design ponderadas para preparar esses dados para treinamento e inferência de modelos. Esse padrão demonstra como você pode usar técnicas manuais de macrofragmentação e microfragmentação em combinação com o Amazon SageMaker Processing e a paralelização de CPU virtual (vCPU) para escalar com eficiência os processos de engenharia de recursos para conjuntos de dados ML de big data complicados. 

Esse padrão define macrofragmentação como a divisão de diretórios de dados em várias máquinas para processamento e microfragmentação como a divisão de dados em cada máquina em vários segmentos de processamento. O padrão demonstra essas técnicas usando a Amazon SageMaker com amostras de registros de formas de onda de séries temporais do conjunto de dados MIMIC-III. PhysioNet Ao implementar as técnicas nesse padrão, você pode minimizar o tempo de processamento e os custos da engenharia de atributos e, ao mesmo tempo, maximizar a utilização dos recursos e a eficiência de throughput. Essas otimizações dependem do SageMaker processamento distribuído em instâncias do Amazon Elastic Compute Cloud (Amazon EC2) e vCPUs para conjuntos de dados grandes e semelhantes, independentemente do tipo de dados.

Pré-requisitos e limitações

Pré-requisitos

  • Acesso às instâncias do SageMaker notebook ou ao SageMaker Studio, se você quiser implementar esse padrão para seu próprio conjunto de dados. Se você estiver usando a Amazon SageMaker pela primeira vez, consulte Comece a usar a Amazon SageMaker na documentação da AWS.

  • SageMaker Studio, se você quiser implementar esse padrão com os dados de amostra do PhysioNet MIMIC-III

  • O padrão usa SageMaker Processing, mas não exige nenhuma experiência na execução de trabalhos SageMaker de Processing.

Limitações

  • Esse padrão é adequado para conjuntos de dados de ML que incluem arquivos interdependentes. Essas interdependências são as que mais se beneficiam da fragmentação manual de macros e da execução paralela de várias tarefas de processamento de instância única SageMaker . Para conjuntos de dados em que essas interdependências não existem, o ShardedByS3Key recurso em SageMaker Processing pode ser uma alternativa melhor à macrofragmentação, pois envia dados fragmentados para várias instâncias que são gerenciadas pela mesma tarefa de Processamento. No entanto, você pode implementar a estratégia de microfragmentação desse padrão em ambos os cenários para melhor utilizar as vCPUs de instância.

Versões do produto

  • SDK do Amazon SageMaker Python versão 2

Arquitetura

Pilha de tecnologias de destino

  • Amazon Simple Storage Service (Amazon S3)

  • Amazon SageMaker

Arquitetura de destino

Macrofragmentação e instâncias EC2 distribuídas

Os 10 processos paralelos representados nessa arquitetura refletem a estrutura do conjunto de dados MIMIC-III. (Os processos são representados por elipses para simplificar o diagrama.) Uma arquitetura semelhante se aplica a qualquer conjunto de dados quando você usa macrofragmentação manual. No caso do MIMIC-III, você pode usar a estrutura bruta do conjunto de dados a seu favor, processando cada pasta de grupo de pacientes separadamente, com o mínimo esforço. No diagrama a seguir, o bloco de grupos de registros aparece à esquerda (1). Dada a natureza distribuída dos dados, faz sentido fragmenta-los por grupo de pacientes.

Arquitetura para microfragmentação e instâncias EC2 distribuídas

No entanto, a fragmentação manual por grupo de pacientes significa que uma tarefa de processamento separada é necessária para cada pasta de grupo de pacientes, como você pode ver na seção central do diagrama (2), em vez de uma única tarefa de processamento com várias instâncias do EC2. Como os dados do MIMIC-III incluem arquivos de forma de onda binários e arquivos de cabeçalho baseados em texto correspondentes, e há uma dependência necessária da biblioteca wfdb para extração de dados binários, todos os registros de um paciente específico devem ser disponibilizados na mesma instância. A única maneira de garantir que o arquivo de cabeçalho associado a cada arquivo de forma de onda binária também esteja presente é implementar a fragmentação manual para executar cada fragmento em seu próprio trabalho de processamento e especificar s3_data_distribution_type='FullyReplicated' quando você define a entrada do trabalho de processamento. Como alternativa, se todos os dados estivessem disponíveis em um único diretório e não existissem dependências entre os arquivos, uma opção mais adequada seria iniciar uma única tarefa de processamento com várias instâncias do EC2 e s3_data_distribution_type='ShardedByS3Key' especificados. A especificação ShardedByS3Key  do tipo de distribuição de dados do Amazon S3 SageMaker direciona o gerenciamento automático da fragmentação de dados entre instâncias. 

Iniciar uma tarefa de processamento para cada pasta é uma forma econômica de pré-processar os dados, pois a execução simultânea de várias instâncias economiza tempo. Para economizar custos e tempo adicionais, você pode usar a microfragmentação em cada tarefa de processamento. 

Microfragmentação e vCPUs paralelas

Em cada tarefa de processamento, os dados agrupados são divididos ainda mais para maximizar o uso de todas as vCPUs disponíveis na instância EC2 totalmente SageMaker gerenciada. Os blocos na seção central do diagrama (2) mostram o que acontece em cada tarefa de processamento principal. O conteúdo das pastas de registros do paciente é nivelado e dividido uniformemente com base no número de vCPUs disponíveis na instância. Depois que o conteúdo da pasta é dividido, o conjunto de arquivos de tamanho uniforme é distribuído em todas as vCPUs para processamento. Quando o processamento é concluído, os resultados de cada vCPU são combinados em um único arquivo de dados para cada tarefa de processamento. 

No código em anexo, esses conceitos são representados na seção a seguir do arquivo src/feature-engineering-pass1/preprocessing.py.

def chunks(lst, n):     """     Yield successive n-sized chunks from lst.          :param lst: list of elements to be divided     :param n: number of elements per chunk     :type lst: list     :type n: int     :return: generator comprising evenly sized chunks     :rtype: class 'generator'     """     for i in range(0, len(lst), n):         yield lst[i:i + n]     # Generate list of data files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']   # Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))   # Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)   # Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))

Uma função, chunks, é definida primeiro para consumir uma determinada lista dividindo-a em partes de tamanho uniforme e retornando esses resultados como um gerador. Em seguida, os dados são agrupados nas pastas dos pacientes compilando uma lista de todos os arquivos binários de forma de onda que estão presentes. Depois disso, o número de vCPUs disponíveis na instância do EC2 é obtido. A lista de arquivos de forma de onda binária é dividida uniformemente entre essas vCPUs por meio de uma chamada de chunks, em seguida, cada sublista de forma de onda é processada em sua própria vCPU usando a classe Parallel do joblib. Os resultados são combinados automaticamente em uma única lista de dataframes pelo trabalho de processamento, que SageMaker então processa ainda mais antes de gravá-los no Amazon S3 após a conclusão do trabalho. Neste exemplo, há 10 arquivos gravados no Amazon S3 pelos trabalhos de processamento (um para cada trabalho).

Quando todas as tarefas de processamento iniciais estiverem concluídas, uma tarefa de processamento secundária, mostrada no bloco à direita do diagrama (3), combina os arquivos de saída produzidos por cada tarefa de processamento principal e grava a saída combinada no Amazon S3 (4).

Ferramentas

Ferramentas

  • Python: o código de amostra usado para esse padrão é Python (versão 3).

  • SageMaker Studio — O Amazon SageMaker Studio é um ambiente de desenvolvimento integrado (IDE) baseado na web para aprendizado de máquina que permite criar, treinar, depurar, implantar e monitorar seus modelos de aprendizado de máquina. Você executa trabalhos SageMaker de processamento usando notebooks Jupyter dentro do Studio. SageMaker

  • SageMaker Processamento — O Amazon SageMaker Processing fornece uma forma simplificada de executar suas cargas de trabalho de processamento de dados. Nesse padrão, o código de engenharia de recursos é implementado em escala usando trabalhos SageMaker de processamento.

Código

O arquivo .zip anexado fornece o código completo desse padrão. A seção a seguir descreve as etapas para criar a arquitetura para esse padrão. Cada etapa é ilustrada pelo código de amostra do anexo.

Épicos

TarefaDescriçãoHabilidades necessárias
Acesse o Amazon SageMaker Studio.

Integre-se ao SageMaker Studio em sua conta da AWS seguindo as instruções fornecidas na SageMaker documentação da Amazon.

Cientista de dados, engenheiro de ML
Instale o utilitário wget.

Instale o wget se você embarcou com uma nova configuração do SageMaker Studio ou se nunca usou esses utilitários no Studio antes. SageMaker  

Para instalar, abra uma janela de terminal no console do SageMaker Studio e execute o seguinte comando:

sudo yum install wget
Cientista de dados, engenheiro de ML
Faça download e descompacte o código de exemplo.

Faça o download do arquivo attachments.zip na seção Anexos. Em uma janela de terminal, navegue até a pasta em que você baixou o arquivo e extraia seu conteúdo:

unzip attachment.zip

Navegue até a pasta em que você extraiu o arquivo .zip e extraia o conteúdo do arquivo Scaled-Processing.zip.

unzip Scaled-Processing.zip
Cientista de dados, engenheiro de ML
Faça o download do conjunto de dados de amostra em physionet.org e faça o upload para o Amazon S3.

Execute o caderno Jupyter get_data.ipynb dentro da pasta que contém os arquivos Scaled-Processing. Este notebook baixa uma amostra do conjunto de dados MIMIC-III do physionet.org e a carrega em seu bucket de sessão do Studio no Amazon S3. SageMaker

Cientista de dados, engenheiro de ML
TarefaDescriçãoHabilidades necessárias
Nivele a hierarquia de arquivos em todos os subdiretórios.

Em grandes conjuntos de dados, como o MIMIC-III, os arquivos geralmente são distribuídos em vários subdiretórios, mesmo dentro de um grupo pai lógico. Seu script deve ser configurado para nivelar todos os arquivos do grupo em todos os subdiretórios, conforme demonstra o código a seguir.

# Generate list of .dat files on machine data_dir = input_dir d_subs = next(os.walk(os.path.join(data_dir, '.')))[1] file_list = [] for ds in d_subs:     file_list.extend(os.listdir(os.path.join(data_dir, ds, '.'))) dat_list = [os.path.join(re.split('_|\.', f)[0].replace('n', ''), f[:-4]) for f in file_list if f[-4:] == '.dat']

Observação    os trechos de código de exemplo neste épico são do arquivo src/feature-engineering-pass1/preprocessing.py, que é fornecido no anexo.

Cientista de dados, engenheiro de ML
Divida os arquivos em subgrupos com base na contagem de vCPUs.

Os arquivos devem ser divididos em subgrupos ou partes de tamanho uniforme, dependendo do número de vCPUs presentes na instância que executa o script. Para esta etapa, você pode implementar um código semelhante ao seguinte.

# Split list of files into sub-lists cpu_count = multiprocessing.cpu_count() splits = int(len(dat_list) / cpu_count) if splits == 0: splits = 1 dat_chunks = list(chunks(dat_list, splits))
Cientista de dados, engenheiro de ML
Paralelize o processamento de subgrupos em vCPUs.

A lógica do script deve ser configurada para processar todos os subgrupos em paralelo. Para fazer isso, use a classe Parallel  e o método delayed  da biblioteca Joblib da seguinte forma. 

# Parallelize processing of sub-lists across CPUs ws_df_list = Parallel(n_jobs=-1, verbose=0)(delayed(run_process)(dc) for dc in dat_chunks)
Cientista de dados, engenheiro de ML
Salve a saída de um único grupo de arquivos no Amazon S3.

Quando o processamento paralelo da vCPU estiver concluído, os resultados de cada vCPU deverão ser combinados e enviados para o caminho do bucket S3 do grupo de arquivos. Para esta etapa, você pode usar um código semelhante ao seguinte.

# Compile and pickle patient group dataframe ws_df_group = pd.concat(ws_df_list) ws_df_group = ws_df_group.reset_index().rename(columns={'index': 'signal'}) ws_df_group.to_json(os.path.join(output_dir, group_data_out))
Cientista de dados, engenheiro de ML
TarefaDescriçãoHabilidades necessárias
Combine arquivos de dados produzidos em todas as tarefas de processamento que executaram o primeiro script.

O script anterior gera um único arquivo para cada tarefa de SageMaker processamento que processa um grupo de arquivos do conjunto de dados.  Em seguida, você precisa combinar esses arquivos de saída em um único objeto e gravar um único conjunto de dados de saída no Amazon S3. Isso é demonstrado no arquivo src/feature-engineering-pass1p5/preprocessing.py, que é fornecido no anexo, da seguinte forma.

def write_parquet(wavs_df, path):     """     Write waveform summary dataframe to S3 in parquet format.          :param wavs_df: waveform summary dataframe     :param path: S3 directory prefix     :type wavs_df: pandas dataframe     :type path: str     :return: None     """     extra_args = {"ServerSideEncryption": "aws:kms"}     wr.s3.to_parquet(         df=wavs_df,         path=path,         compression='snappy',         s3_additional_kwargs=extra_args)     def combine_data():     """     Get combined data and write to parquet.          :return: waveform summary dataframe     :rtype: pandas dataframe     """     wavs_df = get_data()     wavs_df = normalize_signal_names(wavs_df)     write_parquet(wavs_df, "s3://{}/{}/{}".format(bucket_xform, dataset_prefix, pass1p5out_data))       return wavs_df     wavs_df = combine_data()
Cientista de dados, engenheiro de ML
TarefaDescriçãoHabilidades necessárias
Execute a primeira tarefa de processamento.

Para realizar a fragmentação de macros, execute uma tarefa de processamento separada para cada grupo de arquivos. A microfragmentação é executada dentro de cada tarefa de processamento, porque cada tarefa executa seu primeiro script. O código a seguir demonstra como iniciar uma tarefa de processamento para cada diretório de grupo de arquivos no trecho a seguir (incluído em notebooks/FeatExtract_Pass1.ipynb).

pat_groups = list(range(30,40)) ts = str(int(time.time()))   for group in pat_groups:     sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                      role=role,                                      instance_type='ml.m5.4xlarge',                                      instance_count=1,                                      volume_size_in_gb=5)     sklearn_processor.run(         code='../src/feature-engineering-pass1/preprocessing.py',         job_name='-'.join(['scaled-processing-p1', str(group), ts]),         arguments=[             "input_path", "/opt/ml/processing/input",             "output_path", "/opt/ml/processing/output",             "group_data_out", "ws_df_group.json"         ],         inputs=         [             ProcessingInput(                 source=f's3://{sess.default_bucket()}/data_inputs/{group}',                 destination='/opt/ml/processing/input',                 s3_data_distribution_type='FullyReplicated'             )         ],         outputs=         [             ProcessingOutput(                 source='/opt/ml/processing/output',                 destination=f's3://{sess.default_bucket()}/data_outputs/{group}'             )         ],         wait=False     )
Cientista de dados, engenheiro de ML
Execute a segunda tarefa de processamento.

Para combinar as saídas geradas pelo primeiro conjunto de trabalhos de processamento e realizar quaisquer cálculos adicionais para pré-processamento, você executa seu segundo script usando um único SageMaker trabalho de processamento. O código a seguir demonstra isso (incluído em notebooks/FeatExtract_Pass1p5.ipynb).

ts = str(int(time.time())) bucket = sess.default_bucket()       sklearn_processor = SKLearnProcessor(framework_version='0.20.0',                                  role=role,                                  instance_type='ml.t3.2xlarge',                                  instance_count=1,                                  volume_size_in_gb=5) sklearn_processor.run(     code='../src/feature-engineering-pass1p5/preprocessing.py',     job_name='-'.join(['scaled-processing', 'p1p5', ts]),     arguments=['bucket', bucket,                'pass1out_prefix', 'data_outputs',                'pass1out_data', 'ws_df_group.json',                'pass1p5out_data', 'waveform_summary.parquet',                'statsdata_name', 'signal_stats.csv'],     wait=True )
Cientista de dados, engenheiro de ML

Recursos relacionados

Anexos

Para acessar o conteúdo adicional associado a este documento, descompacte o seguinte arquivo: attachment.zip