In some of my Apache Airflow installations, DAGs or tasks that are scheduled to run do not run even when the scheduler doesn’t appear to be fully loaded. How can I increase the number of DAGs or tasks that can run concurrently?
Similarly, if my installation is under high load and I want to limit how quickly my Airflow workers pull queued tasks (such as to reduce resource consumption), what can I adjust to reduce the average load?
Advertisement
Answer
Here’s an expanded list of configuration options that are available since Airflow v1.10.2. Some can be set on a per-DAG or per-operator basis, but may also fall back to the setup-wide defaults when they are not specified.
Options that can be specified on a per-DAG basis:
- concurrency: the number of task instances allowed to run concurrently across all active runs of the DAG this is set on. Defaults to- core.dag_concurrencyif not set
- max_active_runs: maximum number of active runs for this DAG. The scheduler will not create new active DAG runs once this limit is hit. Defaults to- core.max_active_runs_per_dagif not set
Examples:
# Only allow one run of this DAG to be running at any given time
dag = DAG('my_dag_id', max_active_runs=1)
# Allow a maximum of 10 tasks to be running across a max of 2 active DAG runs
dag = DAG('example2', concurrency=10, max_active_runs=2)
Options that can be specified on a per-operator basis:
- pool: the pool to execute the task in. Pools can be used to limit parallelism for only a subset of tasks
- task_concurrency: concurrency limit for the same task across multiple DAG runs
Example:
t1 = BaseOperator(pool='my_custom_pool', task_concurrency=12)
Options that are specified across an entire Airflow setup:
- core.parallelism: maximum number of tasks running across an entire Airflow installation
- core.dag_concurrency: max number of tasks that can be running per DAG (across multiple DAG runs)
- core.non_pooled_task_slot_count: number of task slots allocated to tasks not running in a pool
- core.max_active_runs_per_dag: maximum number of active DAG runs, per DAG
- scheduler.max_threads: how many threads the scheduler process should use to use to schedule DAGs
- celery.worker_concurrency: max number of task instances that a worker will process at a time if using CeleryExecutor
- celery.sync_parallelism: number of processes CeleryExecutor should use to sync task state