Ajuste de desempenho para o Apache Airflow no Amazon MWAA - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Ajuste de desempenho para o Apache Airflow no Amazon MWAA

Este tópico descreve como ajustar o desempenho de um ambiente Amazon Managed Workflows para Apache Airflow usando. Como usar opções de configuração do Apache Airflow no Amazon MWAA

Como adicionar uma opção de configuração do Apache Airflow

O procedimento a seguir descreve as etapas para adicionar uma opção de configuração do Airflow ao seu ambiente.

  1. Abra a página Ambientes no console do Amazon MWAA.

  2. Escolha um ambiente.

  3. Selecione Editar.

  4. Escolha Próximo.

  5. Escolha Adicionar configuração personalizada no painel Opções de configuração do Airflow.

  6. Escolha uma configuração na lista suspensa e insira um valor, ou digite uma configuração personalizada e insira um valor.

  7. Escolha Adicionar configuração personalizada para cada configuração que você deseja adicionar.

  8. Escolha Salvar.

Para saber mais, consulte Como usar opções de configuração do Apache Airflow no Amazon MWAA.

Agendador do Apache Airflow

O agendador do Apache Airflow é um componente central do Apache Airflow. Um problema com o agendador pode DAGs impedir a análise e o agendamento de tarefas. Para obter mais informações sobre o ajuste do agendador do Apache Airflow, consulte Ajustar o desempenho do agendador no site de documentação do Apache Airflow.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para o agendador do Apache Airflow e casos de uso do agendador.

Apache Airflow v2
Versão Opção de configuração Padrão Descrição Caso de uso

v2

celery.sync_parallelism

1

O número de processos que o Celery Executor usa para sincronizar o estado da tarefa.

Você pode usar essa opção para evitar conflitos de fila limitando os processos que o Celery Executor usa. Por padrão, um valor é definido como 1 para evitar erros na entrega de registros de tarefas ao CloudWatch Logs. Definir o valor como 0 significa usar o número máximo de processos, mas pode causar erros ao entregar logs de tarefas.

v2

scheduler.idle_sleep_time

1

O número de segundos de espera entre o processamento consecutivo do arquivo DAG no “loop” do agendador.

Você pode usar essa opção para liberar o uso da CPU no Agendador ao aumentar o tempo de inatividade do Agendador após terminar de recuperar os resultados da análise do DAG, localizar e enfileirar tarefas e executar tarefas em fila no Executor. Aumentar esse valor consome o número de threads do Agendador executados em um ambiente em scheduler.parsing_processes para o Apache Airflow v2 e em scheduler.max_threads para o Apache Airflow v1. Isso pode reduzir a capacidade de análise DAGs dos Agendadores e aumentar o tempo necessário DAGs para aparecer no servidor Web.

v2

scheduler.max_dagruns_to_create_per_loop

10

O número máximo de DAGs a serem criados DagRunspor “loop” do Scheduler.

Você pode usar essa opção para liberar recursos para agendar tarefas diminuindo o número máximo de DagRunspara o “loop” do Agendador.

v2

scheduler.parsing_processes

Defina usando a seguinte fórmula: (2 * number of vCPUs) - 1 por padrão.

O número de threads que o Scheduler pode executar paralelamente ao agendamento DAGs.

Você pode usar essa opção para liberar recursos diminuindo o número de processos que o Scheduler executa paralelamente para analisar. DAGs Recomendamos manter esse número baixo se a análise do DAG estiver afetando o agendamento de tarefas. Você deve especificar um valor menor que a contagem de vCPUs em seu ambiente. Para saber mais, consulte Limites.

Limites

Esta seção descreve os limites que você deve considerar ao ajustar os parâmetros padrão do agendador.

scheduler.parsing_processes, scheduler.max_threads

Dois threads são permitidos por vCPU para uma classe de ambiente. Pelo menos um thread deve ser reservado para o agendador de uma classe de ambiente. Se você notar um atraso no agendamento de tarefas, talvez seja necessário aumentar sua classe de ambiente. Por exemplo, um ambiente grande tem uma instância de contêiner Fargate de 4 vCPUs para seu agendador. Isso significa que um máximo total de 7 threads está disponível para uso em outros processos. Ou seja, dois threads multiplicaram quatro vCPUs, menos um para o próprio agendador. O valor que você especifica em scheduler.max_threads e não scheduler.parsing_processes deve exceder o número de threads disponíveis para uma classe de ambiente (conforme mostrado a seguir):

  • mw1.small: não deve exceder 1 thread para outros processos. O thread restante é reservado para o Agendador.

  • mw1.medium: não deve exceder 3 threads de outros processos. O thread restante é reservado para o Agendador.

  • mw1.large: não deve exceder 7 threads de outros processos. O thread restante é reservado para o Agendador.

Pastas do DAG

O Apache Airflow Scheduler verifica continuamente a DAGs pasta em seu ambiente. Quaisquer arquivos plugins.zip contidos ou arquivo Python (.py) contendo instruções de importação “airflow”. Todos os objetos Python DAG resultantes são então colocados em um arquivo DagBagpara que esse arquivo seja processado pelo Agendador para determinar quais tarefas, se houver, precisam ser agendadas. A análise do arquivo Dag ocorre independentemente de os arquivos conterem objetos DAG viáveis.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para a DAGs pasta e seus casos de uso.

Apache Airflow v2
Versão Opção de configuração Padrão Descrição Caso de uso

v2

scheduler.dag_dir_list_interval

300 segundos

O número de segundos em que a DAGs pasta deve ser examinada em busca de novos arquivos.

Você pode usar essa opção para liberar recursos aumentando o número de segundos para analisar a DAGs pasta. Recomendamos aumentar esse valor se você estiver vendo longos tempos de análisetotal_parse_time metrics, o que pode ser devido a um grande número de arquivos na sua DAGs pasta.

v2

scheduler.min_file_process_interval

30 segundos

O número de segundos após os quais o agendador analisa um DAG e as atualizações do DAG são refletidas.

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos que o agendador espera antes de analisar um DAG. Por exemplo, se você especificar um valor de 30, o arquivo DAG será analisado a cada 30 segundos. Recomendamos manter esse número alto para diminuir o uso da CPU em seu ambiente.

Arquivos DAG

Como parte do loop do agendador do Apache Airflow, arquivos DAG individuais são analisados para extrair objetos do DAG Python. No Apache Airflow v2 e superior, o agendador analisa um número máximo de processos de análise ao mesmo tempo. O número de segundos especificado em scheduler.min_file_process_interval deve passar antes que o mesmo arquivo seja analisado novamente.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para arquivos DAG do Apache Airflow e os casos de uso deles.

Apache Airflow v2
Versão Opção de configuração Padrão Descrição Caso de uso

v2

core.dag_file_processor_timeout

50 segundos

O número de segundos antes do DagFileProcessortempo limite de processamento de um arquivo DAG.

Você pode usar essa opção para liberar recursos aumentando o tempo necessário antes que o tempo limite seja DagFileProcessoratingido. Recomendamos aumentar esse valor se você estiver vendo tempos limite em seus registros de processamento do DAG que resultam em nenhum carregamento viável DAGs .

v2

core.dagbag_import_timeout

30 segundos

O tempo limite do número de segundos antes de importar um arquivo do Python.

Você pode usar essa opção para liberar recursos ao aumentar o tempo necessário até que o Agendador atinja o tempo limite ao importar um arquivo Python para extrair os objetos DAG. Essa opção é processada como parte do “loop” do Agendador e deve conter um valor menor que o valor especificado em core.dag_file_processor_timeout.

v2

core.min_serialized_dag_update_interval

30

O número mínimo de segundos após o qual os serializados DAGs no banco de dados são atualizados.

Você pode usar essa opção para liberar recursos aumentando o número de segundos após os quais os serializados DAGs no banco de dados são atualizados. Recomendamos aumentar esse valor se você tiver um número DAGs grande ou complexo DAGs. Aumentar esse valor reduz a carga no Scheduler e no banco de dados à medida que DAGs são serializados.

v2

core.min_serialized_dag_fetch_interval

10

O número de segundos em que um DAG serializado é recuperado do banco de dados quando já está carregado no. DagBag

Você pode usar essa opção para liberar recursos ao aumentar o número de segundos em que um DAG serializado é recuperado. O valor deve ser maior que o valor especificado em core.min_serialized_dag_update_interval para reduzir as taxas de “gravação” do banco de dados. Aumentar esse valor reduz a carga no servidor Web e no banco de dados à medida que DAGs são serializados.

Tarefas

Tanto o agendador quanto os operadores do Apache Airflow estão envolvidos em tarefas de enfileiramento e desenfileiramento. O agendador leva as tarefas analisadas prontas para serem agendadas de um status vazio para um status agendado. O executor, também em execução no contêiner do agendador no Fargate, coloca essas tarefas em fila e define o status como Em fila. Quando os operadores têm capacidade, retiram a tarefa da fila e definem o status como executando, que posteriormente muda o status para Êxito ou Falha com base no sucesso ou falha da tarefa.

Parâmetros

Esta seção descreve as opções de configuração disponíveis para tarefas do Apache Airflow e os casos de uso delas.

As opções de configuração padrão que o Amazon MWAA substitui estão marcadas em. red

Apache Airflow v2
Versão Opção de configuração Padrão Descrição Caso de uso

v2

core.parallelism

Definido dinamicamente com base em (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

O número máximo de instâncias de tarefas que podem ter o status “Em execução”.

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas que podem ser executadas simultaneamente. O valor especificado deve ser o número de Operadores disponíveis “vezes” a densidade de tarefas dos Operadores. Recomendamos alterar esse valor somente quando você estiver vendo um grande número de tarefas bloqueadas no estado “Em execução” ou “Em fila”.

v2

core.dag_concurrency

10000

O número de instâncias de tarefas que podem ser executadas simultaneamente para cada DAG.

Você pode usar essa opção para liberar recursos ao aumentar o número de instâncias de tarefas autorizadas a serem executadas simultaneamente. Por exemplo, se você tiver cem DAGs com dez tarefas paralelas e quiser que todas DAGs sejam executadas simultaneamente, você pode calcular o paralelismo máximo como o número de trabalhadores disponíveis “vezes” a densidade da tarefa de trabalhadorescelery.worker_concurrency, dividido pelo número de DAGs (por exemplo, 100).

v2

core.execute_tasks_new_python_interpreter

True

Determina se o Apache Airflow executa tarefas bifurcando o processo principal ou criando um processo em Python.

Quando definido como True, o Apache Airflow reconhece as alterações feitas nos seus plug-ins como um novo processo Python criado para executar tarefas.

v2

celery.worker_concurrency

N/D

O Amazon MWAA substitui a instalação básica do Airflow por essa opção para escalar Operadores como parte de seu componente de escalonamento automático.

Any value specified for this option is ignored.

v2

celery.worker_autoscale

mw1.micro - 3,0

mw1.small: 5,0

mw1.medium: 10,0

mw1.large: 20,0

mw1.xlarge: 40,0

mw1.2xlarge: 80,0

A simultaneidade de tarefas para Operadores.

Você pode usar essa opção para liberar recursos ao reduzir a simultaneidade maximum, minimum de tarefas dos Operadores. Os Operadores aceitam até maximum tarefas simultâneas configuradas, independentemente de haver recursos suficientes para fazer isso. Se as tarefas forem agendadas sem recursos suficientes, elas falharão imediatamente. Recomendamos alterar esse valor em tarefas que consomem muitos recursos ao reduzir os valores para serem menores que os padrões a fim de permitir mais capacidade por tarefa.