Running a worker on a different machine results in errors specified below. I have followed the configuration instructions and have sync the dags folder.
I would also like to confirm that RabbitMQ and PostgreSQL only needs to be installed on the Airflow core machine and does not need to be installed on the workers (the workers only connect to the core).
The specification of the setup is detailed below:
Airflow core/server computer
Has the following installed:
- Python 2.7 with
- airflow (AIRFLOW_HOME = ~/airflow)
- celery
- psycogp2
- RabbitMQ
- PostgreSQL
Configurations made in airflow.cfg:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.2:5672//
celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
Tests performed:
- RabbitMQ is running
- Can connect to PostgreSQL and have confirmed that Airflow has created tables
- Can start and view the webserver (including custom dags)
.
.
Airflow worker computer
Has the following installed:
- Python 2.7 with
- airflow (AIRFLOW_HOME = ~/airflow)
- celery
- psycogp2
Configurations made in airflow.cfg are exactly the same as in the server:
sql_alchemy_conn = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
executor = CeleryExecutor
broker_url = amqp://username:password@192.168.1.2:5672//
celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
Output from commands run on the worker machine:
When running airflow flower
:
ubuntu@airflow_client:~/airflow$ airflow flower [2016-06-13 04:19:42,814] {__init__.py:36} INFO - Using executor CeleryExecutor Traceback (most recent call last): File "/home/ubuntu/anaconda2/bin/airflow", line 15, in <module> args.func(args) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/airflow/bin/cli.py", line 576, in flower os.execvp("flower", ['flower', '-b', broka, port, api]) File "/home/ubuntu/anaconda2/lib/python2.7/os.py", line 346, in execvp _execvpe(file, args) File "/home/ubuntu/anaconda2/lib/python2.7/os.py", line 382, in _execvpe func(fullname, *argrest) OSError: [Errno 2] No such file or directory
When running airflow worker
:
ubuntu@airflow_client:~$ airflow worker [2016-06-13 04:08:43,573] {__init__.py:36} INFO - Using executor CeleryExecutor [2016-06-13 04:08:43,935: ERROR/MainProcess] Unrecoverable error: ImportError('No module named postgresql',) Traceback (most recent call last): File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/worker/__init__.py", line 206, in start self.blueprint.start(self) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/bootsteps.py", line 119, in start self.on_start() File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/apps/worker.py", line 169, in on_start string(self.colored.cyan(' n', self.startup_info())), File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/apps/worker.py", line 230, in startup_info results=self.app.backend.as_uri(), File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/kombu/utils/__init__.py", line 325, in __get__ value = obj.__dict__[self.__name__] = self.__get(obj) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/app/base.py", line 626, in backend return self._get_backend() File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/app/base.py", line 444, in _get_backend self.loader) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/backends/__init__.py", line 68, in get_backend_by_url return get_backend_cls(backend, loader), url File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/celery/backends/__init__.py", line 49, in get_backend_cls cls = symbol_by_name(backend, aliases) File "/home/ubuntu/anaconda2/lib/python2.7/site-packages/kombu/utils/__init__.py", line 96, in symbol_by_name module = imp(module_name, package=package, **kwargs) File "/home/ubuntu/anaconda2/lib/python2.7/importlib/__init__.py", line 37, in import_module __import__(name) ImportError: No module named postgresql
When celery_result_backend
is changed to the default db+mysql://airflow:airflow@localhost:3306/airflow
and the airflow worker
is run again the result is:
ubuntu@airflow_client:~/airflow$ airflow worker [2016-06-13 04:17:32,387] {__init__.py:36} INFO - Using executor CeleryExecutor -------------- celery@airflow_client2 v3.1.23 (Cipater) ---- **** ----- --- * *** * -- Linux-3.19.0-59-generic-x86_64-with-debian-jessie-sid -- * - **** --- - ** ---------- [config] - ** ---------- .> app: airflow.executors.celery_executor:0x7f5cb65cb510 - ** ---------- .> transport: amqp://username:**@192.168.1.2:5672// - ** ---------- .> results: mysql://airflow:**@localhost:3306/airflow - *** --- * --- .> concurrency: 16 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> default exchange=default(direct) key=celery [2016-06-13 04:17:33,385] {__init__.py:36} INFO - Using executor CeleryExecutor Starting flask [2016-06-13 04:17:33,737] {_internal.py:87} INFO - * Running on http://0.0.0.0:8793/ (Press CTRL+C to quit) [2016-06-13 04:17:34,536: WARNING/MainProcess] celery@airflow_client2 ready.
What am I missing? How can I diagnose this further?
Advertisement
Answer
The ImportError: No module named postgresql
error is due to the invalid prefix used in your celery_result_backend
. When using a database as a Celery backend, the connection URL must be prefixed with db+
. See
https://docs.celeryproject.org/en/stable/userguide/configuration.html#conf-database-result-backend
So replace:
celery_result_backend = postgresql+psycopg2://username:password@192.168.1.2:5432/airflow
with something like:
celery_result_backend = db+postgresql://username:password@192.168.1.2:5432/airflow