I have two, separate RabbitMQ instances. I’m trying to find the best way to listen to events from both.
For example, I can consume events on one with the following:
credentials = pika.PlainCredentials(user, pass) connection = pika.BlockingConnection(pika.ConnectionParameters(host="host1", credentials=credentials)) channel = connection.channel() result = channel.queue_declare(Exclusive=True) self.channel.queue_bind(exchange="my-exchange", result.method.queue, routing_key='*.*.*.*.*') channel.basic_consume(callback_func, result.method.queue, no_ack=True) self.channel.start_consuming()
I have a second host, “host2”, that I’d like to listen to as well. I thought about creating two separate threads to do this, but from what I’ve read, pika isn’t thread safe. Is there a better way? Or would creating two separate threads, each listening to a different Rabbit instance (host1, and host2) be sufficient?
Advertisement
Answer
The answer to “what is the best way” depends heavily on your usage pattern of queues and what you mean by “best”. Since I can’t comment on questions yet, I’ll just try to suggest some possible solutions.
In each example I’m going to assume exchange is already declared.
Threads
You can consume messages from two queues on separate hosts in single process using pika
.
You are right – as its own FAQ states, pika
is not thread safe, but it can be used in multi-threaded manner by creating connections to RabbitMQ hosts per thread. Making this example run in threads using threading
module looks as follows:
import pika import threading class ConsumerThread(threading.Thread): def __init__(self, host, *args, **kwargs): super(ConsumerThread, self).__init__(*args, **kwargs) self._host = host # Not necessarily a method. def callback_func(self, channel, method, properties, body): print("{} received '{}'".format(self.name, body)) def run(self): credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection( pika.ConnectionParameters(host=self._host, credentials=credentials)) channel = connection.channel() result = channel.queue_declare(exclusive=True) channel.queue_bind(result.method.queue, exchange="my-exchange", routing_key="*.*.*.*.*") channel.basic_consume(self.callback_func, result.method.queue, no_ack=True) channel.start_consuming() if __name__ == "__main__": threads = [ConsumerThread("host1"), ConsumerThread("host2")] for thread in threads: thread.start()
I’ve declared callback_func
as a method purely to use ConsumerThread.name
while printing message body. It might as well be a function outside the ConsumerThread
class.
Processes
Alternatively, you can always just run one process with consumer code per queue you want to consume events.
import pika import sys def callback_func(channel, method, properties, body): print(body) if __name__ == "__main__": credentials = pika.PlainCredentials("guest", "guest") connection = pika.BlockingConnection( pika.ConnectionParameters(host=sys.argv[1], credentials=credentials)) channel = connection.channel() result = channel.queue_declare(exclusive=True) channel.queue_bind(result.method.queue, exchange="my-exchange", routing_key="*.*.*.*.*") channel.basic_consume(callback_func, result.method.queue, no_ack=True) channel.start_consuming()
and then run by:
$ python single_consume.py host1 $ python single_consume.py host2 # e.g. on another console
If the work you’re doing on messages from queues is CPU-heavy and as long as number of cores in your CPU >= number of consumers, it is generally better to use this approach – unless your queues are empty most of the time and consumers won’t utilize this CPU time*.
Async
Another alternative is to involve some asynchronous framework (for example Twisted
) and running whole thing in single thread.
You can no longer use BlockingConnection
in asynchronous code; fortunately, pika
has adapter for Twisted
:
from pika.adapters.twisted_connection import TwistedProtocolConnection from pika.connection import ConnectionParameters from twisted.internet import protocol, reactor, task from twisted.python import log class Consumer(object): def on_connected(self, connection): d = connection.channel() d.addCallback(self.got_channel) d.addCallback(self.queue_declared) d.addCallback(self.queue_bound) d.addCallback(self.handle_deliveries) d.addErrback(log.err) def got_channel(self, channel): self.channel = channel return self.channel.queue_declare(exclusive=True) def queue_declared(self, queue): self._queue_name = queue.method.queue self.channel.queue_bind(queue=self._queue_name, exchange="my-exchange", routing_key="*.*.*.*.*") def queue_bound(self, ignored): return self.channel.basic_consume(queue=self._queue_name) def handle_deliveries(self, queue_and_consumer_tag): queue, consumer_tag = queue_and_consumer_tag self.looping_call = task.LoopingCall(self.consume_from_queue, queue) return self.looping_call.start(0) def consume_from_queue(self, queue): d = queue.get() return d.addCallback(lambda result: self.handle_payload(*result)) def handle_payload(self, channel, method, properties, body): print(body) if __name__ == "__main__": consumer1 = Consumer() consumer2 = Consumer() parameters = ConnectionParameters() cc = protocol.ClientCreator(reactor, TwistedProtocolConnection, parameters) d1 = cc.connectTCP("host1", 5672) d1.addCallback(lambda protocol: protocol.ready) d1.addCallback(consumer1.on_connected) d1.addErrback(log.err) d2 = cc.connectTCP("host2", 5672) d2.addCallback(lambda protocol: protocol.ready) d2.addCallback(consumer2.on_connected) d2.addErrback(log.err) reactor.run()
This approach would be even better, the more queues you would consume from and the less CPU-bound the work performing by consumers is*.
Python 3
Since you’ve mentioned pika
, I’ve restricted myself to Python 2.x-based solutions, because pika
is not yet ported.
But in case you would want to move to >=3.3, one possible option is to use asyncio
with one of AMQP protocol (the protocol you speak in with RabbitMQ) , e.g. asynqp
or aioamqp
.
* – please note that these are very shallow tips – in most cases choice is not that obvious; what will be the best for you depends on queues “saturation” (messages/time), what work do you do upon receiving these messages, what environment you run your consumers in etc.; there’s no way to be sure other than to benchmark all implementations