I have a following project directory:
JavaScript
x
6
1
azima:
2
__init.py
3
main.py
4
tasks.py
5
monitor.py
6
tasks.py
JavaScript
1
14
14
1
from .main import app
2
3
@app.task
4
def add(x, y):
5
return x + y
6
7
@app.task
8
def mul(x, y):
9
return x * y
10
11
@app.task
12
def xsum(numbers):
13
return sum(numbers)
14
main.py
JavaScript
1
12
12
1
from celery import Celery
2
3
app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])
4
5
# Optional configuration, see the application user guide.
6
app.conf.update(
7
result_expires=3600,
8
)
9
10
if __name__ == '__main__':
11
app.start()
12
monitor.py
JavaScript
1
37
37
1
from .main import app
2
3
def my_monitor(app):
4
state = app.events.State()
5
6
def announce_failed_tasks(event):
7
state.event(event)
8
task = state.tasks.get(event['uuid'])
9
10
print(f'TASK FAILED: {task.name}[{task.uuid}]')
11
12
def announce_succeeded_tasks(event):
13
print('task succeeded')
14
state.event(event)
15
task = state.tasks.get(event['uuid'])
16
17
print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')
18
19
def worker_online_handler(event):
20
state.event(event)
21
print("New worker gets online")
22
print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])
23
24
with app.connection() as connection:
25
recv = app.events.Receiver(connection, handlers={
26
'task-failed': announce_failed_tasks,
27
'task-succeeded': announce_succeeded_tasks,
28
'worker-online': worker_online_handler,
29
'*': state.event,
30
})
31
recv.capture(limit=None, timeout=None, wakeup=True)
32
33
if __name__ == '__main__':
34
# app = Celery('azima')
35
my_monitor(app)
36
37
Started celery worker with
JavaScript
1
2
1
celery -A azima.main worker -l INFO
2
And started monitor.py
with
JavaScript
1
2
1
python -m azima.monitor
2
But Only worker-online
event is being triggered, while other events like task-succeeded
is not triggered or handled.
What am I missing here?
Advertisement
Answer
Enable worker task-
group events with cli option -E
or --task-events
and try to capture all events:
JavaScript
1
8
1
def my_monitor(app):
2
def on_event(event):
3
print("Event.type", event.get('type'))
4
5
with app.connection() as connection:
6
recv = app.events.Receiver(connection, handlers={'*': on_event})
7
recv.capture(limit=None, timeout=None, wakeup=True)
8