Skip to content
Advertisement

python celery monitoring events not being triggered

I have a following project directory:

azima:
    __init.py
    main.py
    tasks.py
    monitor.py

tasks.py

from .main import app

@app.task
def add(x, y):
    return x + y

@app.task
def mul(x, y):
    return x * y

@app.task
def xsum(numbers):
    return sum(numbers)

main.py

from celery import Celery

app = Celery('azima', backend='redis://localhost:6379/0', broker='redis://localhost:6379/0', include=['azima.tasks'])

# Optional configuration, see the application user guide.
app.conf.update(
    result_expires=3600,
)

if __name__ == '__main__':
    app.start()

monitor.py

from .main import app

def my_monitor(app):
    state = app.events.State()

    def announce_failed_tasks(event):
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK FAILED: {task.name}[{task.uuid}]')

    def announce_succeeded_tasks(event):
        print('task succeeded')
        state.event(event)
        task = state.tasks.get(event['uuid'])

        print(f'TASK SUCCEEDED: {task.name}[{task.uuid}]')

    def worker_online_handler(event):
        state.event(event)
        print("New worker gets online")
        print(event['hostname'], event['timestamp'], event['freq'], event['sw_ver'])

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={
                'task-failed': announce_failed_tasks,
                'task-succeeded': announce_succeeded_tasks,
                'worker-online': worker_online_handler,
                '*': state.event,
        })
        recv.capture(limit=None, timeout=None, wakeup=True)

if __name__ == '__main__':
    # app = Celery('azima')
    my_monitor(app)

Started celery worker with

celery -A azima.main worker -l INFO

And started monitor.py with

python -m azima.monitor

But Only worker-online event is being triggered, while other events like task-succeeded is not triggered or handled.

enter image description here

What am I missing here?

Advertisement

Answer

Enable worker task- group events with cli option -E or --task-events and try to capture all events:

def my_monitor(app):
    def on_event(event):
        print("Event.type", event.get('type'))

    with app.connection() as connection:
        recv = app.events.Receiver(connection, handlers={'*': on_event})
        recv.capture(limit=None, timeout=None, wakeup=True)
User contributions licensed under: CC BY-SA
7 People found this is helpful
Advertisement