Python Kombu

我正在使用kombu通过生产者/消费者模型来管理RabbitMQ。 我启动了我的制作人,在一个队列中放置了100个工作(我只有一个队列和一个交换)。 我想同时推出多个消费者,并让每个消费者一次处理一份工作。 不幸的是,消费者互相阻碍(即一个消费者从队列中抓住工作,其他消费者只是闲置)。 如果我杀了工作的消费者,那么其他消费者之一就会开始工作。 有没有办法让所有的用户同时运行,每个用户都从队列中处理不同的工作? 我的消费者代码如下:

def start_consumer(self, incoming_exchange_name):
    if self.rabbitmq_connection.connected:
        callbacks=[]
        queues=[]

        callbacks.append(self._callback)
        queues.append(self.incoming_queue)

        print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
        self.incoming_exchange(settings.rabbitmq_connection).declare()
        self.incoming_queue(settings.rabbitmq_connection).declare()

        with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks) as consumer:
            while True:
                try:
                    self.rabbitmq_connection.drain_events()
                except Exception as e:
                    print 'Error -> %s' % e.message 

您需要将使用者预取设置为1(https://kombu.readthedocs.org/en/latest/reference/kombu.transport.pyamqp.Connection.Channel.basic_qos),这样每个消费者只需抓取1条消息,并将剩下的其他消息放在状态准备就绪的队列中,因此如果您有2个QOS设置为1的消费者,并且您有100条消息,则您将处理2个同时执行的任务。

我已将缺少的部分添加到您的代码中,以设置预取计数

def start_consumer(self, incoming_exchange_name):
if self.rabbitmq_connection.connected:
    callbacks=[]
    queues=[]

    callbacks.append(self._callback)
    queues.append(self.incoming_queue)

    print 'opening a new *incoming* rabbitmq connection to the %s exchange for the %s queue' % (self.incoming_exchange.name, self.incoming_queue.name)
    self.incoming_exchange(settings.rabbitmq_connection).declare()
    self.incoming_queue(settings.rabbitmq_connection).declare()

    channel = self.rabbitmq_connection.channel()
    channel.basic_qos(prefetch_size=0, prefetch_count=1, a_global=False)

    with settings.rabbitmq_connection.Consumer(queues=queues, callbacks=callbacks, channel=channel) as consumer:
        while True:
            try:
                self.rabbitmq_connection.drain_events()
            except Exception as e:
                print 'Error -> %s' % e.message 

我想你基本上是想自己重写芹菜:

http://www.celeryproject.org/

除非你纯粹是为了学习的目的而做,否则不必担心疼痛并使用芹菜。 顺便说一下, kombuRabbitMQ恰恰是Celery用作后端的(更不用说Redis后端可用了,这在一些应用中为我节省了无数的精力)。

链接地址: http://www.djcxy.com/p/46361.html

上一篇: Python Kombu

下一篇: RabbitMQ by Example: Multiple Threads, Channels and Queues