Performance tuning for Apache Airflow on Amazon MWAA - Amazon Managed Workflows for Apache Airflow

Performance tuning for Apache Airflow on Amazon MWAA

This page describes the best practices we recommend to tune the performance of an Amazon Managed Workflows for Apache Airflow environment using Using Apache Airflow configuration options on Amazon MWAA.

Adding an Apache Airflow configuration option

The following procedure walks you through the steps of adding an Airflow configuration option to your environment.

  1. Open the Environments page on the Amazon MWAA console.

  2. Choose an environment.

  3. Choose Edit.

  4. Choose Next.

  5. Choose Add custom configuration in the Airflow configuration options pane.

  6. Choose a configuration from the dropdown list and enter a value, or type a custom configuration and enter a value.

  7. Choose Add custom configuration for each configuration you want to add.

  8. Choose Save.

To learn more, see Using Apache Airflow configuration options on Amazon MWAA.

Apache Airflow scheduler

The Apache Airflow scheduler is a core component of Apache Airflow. An issue with the scheduler can prevent DAGs from being parsed and tasks from being scheduled. For more information about Apache Airflow scheduler tuning, see Fine-tuning your scheduler performance in the Apache Airflow documentation website.

Parameters

This section describes the configuration options available for the Apache Airflow scheduler and their use cases.

Apache Airflow v2
Version Configuration option Default Description Use case

v2

celery.sync_parallelism

1

The number of processes the Celery Executor uses to sync task state.

You can use this option to prevent queue conflicts by limiting the processes the Celery Executor uses. By default, a value is set to 1 to prevent errors in delivering task logs to CloudWatch Logs. Setting the value to 0 means using the maximum number of processes, but might cause errors when delivering task logs.

v2

scheduler.processor_poll_interval

1

The number of seconds to wait between consecutive DAG file processing in the Scheduler "loop."

You can use this option to free up CPU usage on the Scheduler by increasing the time the Scheduler sleeps after it's finished retrieving DAG parsing results, finding and queuing tasks, and executing queued tasks in the Executor. Increasing this value consumes the number of Scheduler threads run on an environment in scheduler.parsing_processes for Apache Airflow v2 and scheduler.max_threads for Apache Airflow v1. This may reduce the capacity of the Schedulers to parse DAGs, and increase the time it takes for DAGs to appear in the Web server.

v2

scheduler.max_dagruns_to_create_per_loop

10

The maximum number of DAGs to create DagRuns for per Scheduler "loop."

You can use this option to free up resources for scheduling tasks by decreasing the maximum number of DagRuns for the Scheduler "loop."

v2

scheduler.parsing_processes

Set using the following formula: (2 * number of vCPUs) - 1 by default.

The number of threads the Scheduler can run in parallel to schedule DAGs.

You can use this option to free up resources by decreasing the number of processes the Scheduler runs in parallel to parse DAGs. We recommend keeping this number low if DAG parsing is impacting task scheduling. You must specify a value that's less than the vCPU count on your environment. To learn more, see Limits.

Limits

This section describes the limits you should consider when adjusting the default parameters for the scheduler.

scheduler.parsing_processes, scheduler.max_threads

Two threads are allowed per vCPU for an environment class. At least one thread must be reserved for the scheduler for an environment class. If you notice a delay in tasks being scheduled, you may need to increase your environment class. For example, a large environment has a 4 vCPU Fargate container instance for its scheduler. This means that a maximum of 7 total threads are available to use for other processes. That is, two threads multiplied four vCPUs, minus one for the scheduler itself. The value you specify in scheduler.max_threads and scheduler.parsing_processes must not exceed the number of threads available for an environment class (as shown, below:

  • mw1.small – Must not exceed 1 thread for other processes. The remaining thread is reserved for the Scheduler.

  • mw1.medium – Must not exceed 3 threads for other processes. The remaining thread is reserved for the Scheduler.

  • mw1.large – Must not exceed 7 threads for other processes. The remaining thread is reserved for the Scheduler.

DAG folders

The Apache Airflow Scheduler continuously scans the DAGs folder on your environment. Any contained plugins.zip files, or Python (.py) files containing “airflow” import statements. Any resulting Python DAG objects are then placed into a DagBag for that file to be processed by the Scheduler to determine what, if any, tasks need to be scheduled. Dag file parsing occurs regardless of whether the files contain any viable DAG objects.

Parameters

This section describes the configuration options available for the DAGs folder and their use cases.

Apache Airflow v2
Version Configuration option Default Description Use case

v2

scheduler.dag_dir_list_interval

300 seconds

The number of seconds the DAGs folder should be scanned for new files.

You can use this option to free up resources by increasing the number of seconds to parse the DAGs folder. We recommend increasing this value if you're seeing long parsing times in total_parse_time metrics, which may be due to a large number of files in your DAGs folder.

v2

scheduler.min_file_process_interval

30 seconds

The number of seconds after which the scheduler parses a DAG and updates to the DAG are reflected.

You can use this option to free up resources by increasing the number of seconds that the scheduler waits before parsing a DAG. For example, if you specify a value of 30, the DAG file is parsed after every 30 seconds. We recommend keeping this number high to decrease the CPU usage on your environment.

DAG files

As part of the Apache Airflow scheduler loop, individual DAG files are parsed to extract DAG Python objects. In Apache Airflow v2 and above, the scheduler parses a maximum of number of parsing processes at the same time. The number of seconds specified in scheduler.min_file_process_interval must pass before the same file is parsed again.

Parameters

This section describes the configuration options available for Apache Airflow DAG files and their use cases.

Apache Airflow v2
Version Configuration option Default Description Use case

v2

core.dag_file_processor_timeout

50 seconds

The number of seconds before the DagFileProcessor times out processing a DAG file.

You can use this option to free up resources by increasing the time it takes before the DagFileProcessor times out. We recommend increasing this value if you're seeing timeouts in your DAG processing logs that result in no viable DAGs being loaded.

v2

core.dagbag_import_timeout

30 seconds

The number of seconds before importing a Python file times out.

You can use this option to free up resources by increasing the time it takes before the Scheduler times out while importing a Python file to extract the DAG objects. This option is processed as part of the Scheduler "loop," and must contain a value lower than the value specified in core.dag_file_processor_timeout.

v2

core.min_serialized_dag_update_interval

30

The minimum number of seconds after which serialized DAGs in the database are updated.

You can use this option to free up resources by increasing the number of seconds after which serialized DAGs in the database are updated. We recommend increasing this value if you have a large number of DAGs, or complex DAGs. Increasing this value reduces the load on the Scheduler and the database as DAGs are serialized.

v2

core.min_serialized_dag_fetch_interval

10

The number of seconds a serialized DAG is re-fetched from the database when already loaded in the DagBag.

You can use this option to free up resources by increasing the number of seconds a serialized DAG is re-fetched. The value must be higher than the value specified in core.min_serialized_dag_update_interval to reduce database "write" rates. Increasing this value reduces the load on the Web server and the database as DAGs are serialized.

Tasks

The Apache Airflow scheduler and workers are both involved in queuing and de-queuing tasks. The scheduler takes parsed tasks ready to schedule from a None status to a Scheduled status. The executor, also running on the scheduler container in Fargate, queues those tasks and sets their status to Queued. When the workers have capacity, it takes the task from the queue and sets the status to Running, which subsequently changes its status to Success or Failed based on whether the task succeeds or fails.

Parameters

This section describes the configuration options available for Apache Airflow tasks and their use cases.

The default configuration options that Amazon MWAA overrides are marked in red.

Apache Airflow v2
Version Configuration option Default Description Use case

v2

core.parallelism

Dynamically set based on (maxWorkers * maxCeleryWorkers) / schedulers * 1.5.

The maximum number of task instances that can have a status of "Running."

You can use this option to free up resources by increasing the number of task instances that can run simultaneously. The value specified should be the number of available Workers "times" the Workers task density. We recommend changing this value only when you're seeing a large number of tasks stuck in the “Running” or “Queued” state.

v2

core.dag_concurrency

10000

The number of task instances allowed to run concurrently for each DAG.

You can use this option to free up resources by increasing the number of task instances allowed to run concurrently. For example, if you have one hundred DAGs with ten parallel tasks, and you want all DAGs to run concurrently, you can calculate the maximum parallelism as the number of available Workers "times" the Workers task density in celery.worker_concurrency, divided by the number of DAGs (e.g. 100).

v2

core.execute_tasks_new_python_interpreter

True

Determines whether Apache Airflow executes tasks by forking the parent process, or by creating a new Python process.

When set to True, Apache Airflow recognizes changes you make to your plugins as a new Python process so created to execute tasks.

v2

celery.worker_concurrency

N/A

Amazon MWAA overrides the Airflow base install for this option to scale Workers as part of its autoscaling component.

Any value specified for this option is ignored.

v2

celery.worker_autoscale

mw1.small - 5,0

mw1.medium - 10,0

mw1.large - 20,0

mw1.xlarge - 40,0

mw1.2xlarge - 80,0

The task concurrency for Workers.

You can use this option to free up resources by reducing the minimum, maximum task concurrency of Workers. Workers accept up to the maximum concurrent tasks configured, regardless of whether there are sufficient resources to do so. If tasks are scheduled without sufficient resources, the tasks immediately fail. We recommend changing this value for resource-intensive tasks by reducing the values to be less than the defaults to allow more capacity per task.