Skip to content
Advertisement

Running RabbitMQ Pika with Quart

I am using the Quart framework, but I also need to use the RabbitMQ Pika connector, but I can’t get them to play nice as they both have infinite loops.

Entrypoint:

from quart import Quart
from .service import Service

app = Quart(__name__)

@app.before_serving
async def startup():
app.service_task = asyncio.ensure_future(service.start())

if not service.initialise():
    sys.exit()

Service Class:

class Service:
def __init__(self, new_instance):
    self._connection = None
    self._channel = None
    self._messaging_thread = None

def initialise(self):

    credentials = pika.PlainCredentials('username', 'password')
    parameters = pika.ConnectionParameters('localhost', credentials=credentials)
    self._connection = pika.BlockingConnection(parameters)
    self._channel = self._connection.channel()
    self._channel.queue_declare(queue='to_be_processed_queue')

    self._channel.basic_consume(queue='to_be_processed_queue',
                                auto_ack=True,
                                on_message_callback=self.callback)
  
    print('creating thread')
    self._messaging_thread = Thread(target=self.run_consume())
    #self._messaging_thread.start()
    print('Thread created...')

def run_consume(self):
    try:
        self._channel.start_consuming()
    except KeyboardInterrupt:
        self._shutdown()

The code isn’t even getting to the print(‘Thread created…’) and I don’t understand. From this question I do understand that RabbitMQ isn’t thread-safe, but I don’t understand how else to run RabbitMQ.

Advertisement

Answer

Pika is not thread safe as you have already spotted but this is not why your program blocks.

Your problem might be here:

print('creating thread')
self._messaging_thread = Thread(target=self.run_consume())
#self._messaging_thread.start()

Does it work better if you remove parentheses from run_consume? Now you are actually not creating a thread but executing self.run_consume() on the spot and it does not exit.

self._messaging_thread = Thread(target=self.run_consume)

would be my first attempt.

However, as Pika is not thread safe, you must also move your channel creation & stuff to your thread instead of doing that in the main program. It might work if you are not using it anywhere else but the correct way with Pika is to contain absolutely everything in the thread and not share any Pika structures between threads as you do now here.

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement