Python Kombu
I'm using kombu to manage RabbitMQ, via a producer/consumer model. I launched my producer, which placed 100 jobs on a queue (I have only one queue, and one exchange). I would like to launch multiple consumers, simultaneously, and have each consumer process one job at a time. Unfortunatly the consumers are blocking each other (ie as one consumer grabs a job from the queue, the other consumers are just sitting idle). If I kill the working consumer, then one of the other consumers kicks in and starts working. Is there a way to have all the consumers running simultaneously, each processing a different job from the queue? My consumer code is below:
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
You need to set the consumer prefetch to 1 (https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.html#kombu.transport.pyamqp.Connection.Channel.basic_qos), that way each consumer will just grab 1 message, and leave the rest in the queue with the state ready, so if you have 2 consumers with QOS set to 1 and you have 100 messages you will be processing 2 simultaneous tasks.
I've added the missing parts to your code, to set the prefetch count
def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
callbacks=[]
queues=[]
callbacks.append(self._callback)
queues.append(self.incoming_queue)
print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
self.incoming_exchange(settings.rabbitmq_connection).declare()
self.incoming_queue(settings.rabbitmq_connection).declare()
channel = self.rabbitmq_connection.channel()
channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)
with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
while True:
try:
self.rabbitmq_connection.drain_events()
except Exception as e:
print 'Error -> %s' % e.message
I think you're essentially trying to rewrite Celery on your own:
http://www.celeryproject.org/
Unless you're doing it purely for learning purpose, spare yourself the pain and use Celery. Incidentally, kombu
and RabbitMQ
is precisely what Celery is using as backend (not to mention Redis backend is available, which saved me untold hrs of effort in some applications).
下一篇: Python Kombu