Skip to content
Advertisement

RQ task from Flask keeps adding to queue

My setup is:

  • Running Redis on docker locally
  • running RQ in virtualenv(rq)
  • running Flask in virtualenv(rq)

Goal is to launch RQ tasks by hitting the Flask app.

Redis seems to running fine, as does the rq worker (launched by “rq worker”)

Simple tasks get queued and done. The simple subprocessed “ls” task2 runs fine.

But my subprocessed python does not!

flask python code (ex1.py):

from flask import Flask, request
import redis
from rq import Queue

import time 

app = Flask(__name__)

r = redis.Redis() #can use password
q = Queue(connection=r)

def task2():
    import subprocess
    result = subprocess.run(['ls', '-l'], stdout=subprocess.PIPE)
    output = result.stdout.decode('utf-8')
    print(output)
    return output

def pyv():
    import subprocess
    result = subprocess.run(['python', '--version'], stdout=subprocess.PIPE)
    output = result.stdout.decode('utf-8')
    print(output)
    return output


@app.route('/t2')
def t2():
    job = q.enqueue(task2)
    q_len = len(q)
    return f'Task {job.id} added to queue at {job.enqueued_at}. {q_len} tasks in the queue'

@app.route('/pyv')
def pyv():
    job = q.enqueue(pyv)
    q_len = len(q)
    return f'Task {job.id} added to queue at {job.enqueued_at}. {q_len} tasks in the queue'


@app.route('/get_tasks')
def get_tasks():
    q_len = len(q)
    return f'{q_len} tasks in the queue'

if __name__ == '__main__':
    app.run()

When i hit the endpoint ‘/pyv’, the output of my “rq worker” window goes nuts and appears to be adding items(???)…here is a sample…

17:57:33 default: ex1.pyv() (5378aa3c-9c01-4b42-ad84-f22021de1983)
17:57:34 default: Job OK (5378aa3c-9c01-4b42-ad84-f22021de1983)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (0d1edd70-9916-4ffc-bf45-692c60bbc327)
17:57:34 default: Job OK (0d1edd70-9916-4ffc-bf45-692c60bbc327)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (99402ee1-909e-474b-b410-be0dfff3f9ea)
17:57:34 default: Job OK (99402ee1-909e-474b-b410-be0dfff3f9ea)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (d049328d-e5d9-4767-b3f0-3356aff3d59b)
17:57:34 default: Job OK (d049328d-e5d9-4767-b3f0-3356aff3d59b)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (d2cb2ab8-86fe-43e3-8bb9-dc3576672d60)
17:57:34 default: Job OK (d2cb2ab8-86fe-43e3-8bb9-dc3576672d60)
17:57:34 Result is kept for 500 seconds
17:57:34 default: ex1.pyv() (00e8ccbf-2a39-48e6-bd1c-27e6ca982cdc)
17:57:34 default: Job OK (00e8ccbf-2a39-48e6-bd1c-27e6ca982cdc)

…and so on…it doesn’t stop until I Cntl+c and restart the docker Redis.

Any ideas? I’m guessing that it should be fine to launch commandline apps like this using this framework? This is my first stab at RQ+Flask. My ultimate goal is to wrap up a heavy python commandline app that uses Tensorflow. I will limit my workers to 2 and I have ensured that the app only uses half of my GPU memory…all of this works fine as-is.

But first, I need to solve this issue! –thanks.

Advertisement

Answer

The “answer” is that I used the same function name (pyv) twice.

def pyv():
    import subprocess
    result = subprocess.run(['python', '--version'], stdout=subprocess.PIPE)
    output = result.stdout.decode('utf-8')
    print(output)
    return output

and

@app.route('/pyv')
def pyv():
    job = q.enqueue(pyv)
    q_len = len(q)
    return f'Task {job.id} added to queue at {job.enqueued_at}. {q_len} tasks in the queue'

So the take-home is do not name your task function the same as your flask function!!

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement