I have a following project directory:
azima: __init.py main.py tasks.py monitor.py
tasks.py
from .main import app @app.task def add(x, y): return x + y @app.task def mul(x, y): return x * y @app.task def xsum(numbers): return sum(numbers)
main.py
from celery import Celery app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks']) # Optional configuration, see the application user guide. app.conf.update( result_expires=3600, ) if __name__ == '__main__': app.start()
monitor.py
from .main import app def my_monitor(app): state = app.events.State() def announce_failed_tasks(event): state.event(event) task = state.tasks.get(event['uuid']) print(f'TASK FAILED: {task.name}[{task.uuid}]') def announce_succeeded_tasks(event): print('task succeeded') state.event(event) task = state.tasks.get(event['uuid']) print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]') def worker_online_handler(event): state.event(event) print("New worker gets online") print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver']) with app.connection() as connection: recv = app.events.Receiver(connection, handlers={ 'task-failed': announce_failed_tasks, 'task-succeeded': announce_succeeded_tasks, 'worker-online': worker_online_handler, '*': state.event, }) recv.capture(limit=None, timeout=None, wakeup=True) if __name__ == '__main__': # app = Celery('azima') my_monitor(app)
Started celery worker with
celery -A azima.main worker -l INFO
And started monitor.py
with
python -m azima.monitor
But Only worker-online
event is being triggered, while other events like task-succeeded
is not triggered or handled.
What am I missing here?
Advertisement
Answer
Enable worker task-
group events with cli option -E
or --task-events
and try to capture all events:
def my_monitor(app): def on_event(event): print("Event.type", event.get('type')) with app.connection() as connection: recv = app.events.Receiver(connection, handlers={'*': on_event}) recv.capture(limit=None, timeout=None, wakeup=True)