I have periodic task which should trigger another task. Final expected behavior: first task should collect some data from external service and then loop over this data (list) and call another task with passing over argument (current iteration in loop). I want to have those tasks in loop being asynchronical.
I wrote code that runs a task in period, but I can’t figure out how this task should call another task, because when I do it by .delay()
method then nothing happens.
Here is some simplified code that I want to run:
@celery_app.task(name="Hello World") def hello_world(): print(f"HELLO WORLD PRINT") add.delay(2, 2) return 'Hello' @celery_app.task def add(x, y): with open(f"./{str(datetime.datetime.now())}.txt", 'w') as file: file.write(str(x+y)) print(f"x + y = {x + y}") return x + y
For now hello_world()
is running every 30 sec and as a result I receive HELLO WORLD PRINT in logs, but add task is not running. I can’t see either print or file that should be created by this task.
Update for comment, here is how I use queue:
celery_app.conf.task_routes = { "project.app.hello_world": { "queue": 'test_queue' }, "project.app.add": { "queue": 'test_queue' },
Advertisement
Answer
There are few ways to solve the problem.
The obvious one is to put the queue name in the .apply_async, for an example add.apply_async(10, 10, queue="test_queue")
.
Another solution is to put the queue into the task annotation, ie @celery_app.task(queue="test_queue")
.
I have never configured task_routes, but I believe it is possible to specify it there like you tried…