Skip to content
Advertisement

Run task from another periodic task with celery

I have periodic task which should trigger another task. Final expected behavior: first task should collect some data from external service and then loop over this data (list) and call another task with passing over argument (current iteration in loop). I want to have those tasks in loop being asynchronical.

I wrote code that runs a task in period, but I can’t figure out how this task should call another task, because when I do it by .delay() method then nothing happens.

Here is some simplified code that I want to run:

@celery_app.task(name="Hello World")
def hello_world():
    print(f"HELLO WORLD PRINT")
    add.delay(2, 2)
    return 'Hello'


@celery_app.task
def add(x, y):
    with open(f"./{str(datetime.datetime.now())}.txt", 'w') as file:
        file.write(str(x+y))
    print(f"x + y = {x + y}")
    return x + y

For now hello_world() is running every 30 sec and as a result I receive HELLO WORLD PRINT in logs, but add task is not running. I can’t see either print or file that should be created by this task.

Update for comment, here is how I use queue:

celery_app.conf.task_routes = {
    "project.app.hello_world": {
        "queue": 'test_queue'
    },
    "project.app.add": {
        "queue": 'test_queue'
    },

Advertisement

Answer

There are few ways to solve the problem.

The obvious one is to put the queue name in the .apply_async, for an example add.apply_async(10, 10, queue="test_queue").

Another solution is to put the queue into the task annotation, ie @celery_app.task(queue="test_queue").

I have never configured task_routes, but I believe it is possible to specify it there like you tried…

User contributions licensed under: CC BY-SA
2 People found this is helpful
Advertisement