I am using the Quart framework, but I also need to use the RabbitMQ Pika connector, but I can’t get them to play nice as they both have infinite loops.
Entrypoint:
from quart import Quart from .service import Service app = Quart(__name__) @app.before_serving async def startup(): app.service_task = asyncio.ensure_future(service.start()) if not service.initialise(): sys.exit()
Service Class:
class Service: def __init__(self, new_instance): self._connection = None self._channel = None self._messaging_thread = None def initialise(self): credentials = pika.PlainCredentials('username', 'password') parameters = pika.ConnectionParameters('localhost', credentials=credentials) self._connection = pika.BlockingConnection(parameters) self._channel = self._connection.channel() self._channel.queue_declare(queue='to_be_processed_queue') self._channel.basic_consume(queue='to_be_processed_queue', auto_ack=True, on_message_callback=self.callback) print('creating thread') self._messaging_thread = Thread(target=self.run_consume()) #self._messaging_thread.start() print('Thread created...') def run_consume(self): try: self._channel.start_consuming() except KeyboardInterrupt: self._shutdown()
The code isn’t even getting to the print(‘Thread created…’) and I don’t understand. From this question I do understand that RabbitMQ isn’t thread-safe, but I don’t understand how else to run RabbitMQ.
Advertisement
Answer
Pika is not thread safe as you have already spotted but this is not why your program blocks.
Your problem might be here:
print('creating thread') self._messaging_thread = Thread(target=self.run_consume()) #self._messaging_thread.start()
Does it work better if you remove parentheses from run_consume? Now you are actually not creating a thread but executing self.run_consume()
on the spot and it does not exit.
self._messaging_thread = Thread(target=self.run_consume)
would be my first attempt.
However, as Pika is not thread safe, you must also move your channel creation & stuff to your thread instead of doing that in the main program. It might work if you are not using it anywhere else but the correct way with Pika is to contain absolutely everything in the thread and not share any Pika structures between threads as you do now here.