Consuming rabbitmq queue from inside python threads

This is a long one.

I have a list of usernames and passwords. For each one I want to login to the accounts and do something things. I want to use several machines to do this faster. The way I was thinking of doing this is have a main machine whose job is just having a cron which from time to time checks if the rabbitmq queue is empty. If it is, read the list of usernames and passwords from a file and send it to the rabbitmq queue. Then have a bunch of machines which are subscribed to that queue whose job is receiving a user/pass, do stuff on it, acknowledge it, and move on to the next one, until the queue is empty and then the main machine fills it up again. So far I think I have everything down.

Now comes my problem. I have checked that the things to be done with each user/passes aren't so intensive and so I could have each machine doing three of them simultaneously using python's threading. In fact for a single machine I have implemented this where I load the user/passes into a python Queue() and then have three threads consume that Queue(). Now I want to do something similar, but instead of consuming from a python Queue(), each thread of each machine should consume from a rabbitmq queue. This is where I'm stuck. To run tests I started by using rabbitmq's tutorial.

send.py:

import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

message = ' '.join(sys.argv[1:])
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
connection.close()

worker.py

import time, pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print ' [x] received %r' % (body,)
    time.sleep( body.count('.') )
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.start_consuming()

For the above you can run two worker.py which will subscribe to the rabbitmq queue and consume as expected.

My threading without rabbitmq is something like this:

runit.py

class Threaded_do_stuff(threading.Thread):
    def __init__(self, user_queue):
        threading.Thread.__init__(self)
        self.user_queue = user_queue

    def run(self):
        while True:
            login = self.user_queue.get()
            do_stuff(user=login[0], pass=login[1])
            self.user_queue.task_done()

user_queue = Queue.Queue()
for i in range(3):
    td = Threaded_do_stuff(user_queue)
    td.setDaemon(True)
    td.start()

## fill up the queue
for user in list_users:
    user_queue.put(user)

## go!
user_queue.join()

This also works as expected: you fill up the queue and have 3 threads subscribe to it. Now what I want to do is something like runit.py but instead of using a python Queue(), using something like worker.py where the queue is actually a rabbitmq queue.

Here's something which I tried and didn't work (and I don't understand why)

rabbitmq_runit.py

import time, threading, pika

class Threaded_worker(threading.Thread):
    def callback(self, ch, method, properties, body):
        print ' [x] received %r' % (body,)
        time.sleep( body.count('.') )
        ch.basic_ack(delivery_tag = method.delivery_tag)

    def __init__(self):
        threading.Thread.__init__(self)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='hello')
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.callback, queue='hello')

    def run(self):
        print 'start consuming'
        self.channel.start_consuming()

for _ in range(3):
    print 'launch thread'
    td = Threaded_worker()
    td.setDaemon(True)
    td.start()

I would expect that this launches three threads each of which is blocked by .start_consuming() which just stays there waiting for the rabbitmq queue to send them sometihing. Instead, this program starts, does some prints, and exits. The pattern of the exists is weird too:

launch thread
launch thread
start consuming
launch thread
start consuming

In particular notice there is one "start consuming" missing.

What's going on?

EDIT: One answer I found to a similar question is here Consuming a rabbitmq message queue with multiple threads (Python Kombu) and the answer is to "use celery", whatever that means. I don't buy it, I shouldn't need anything remotely as sophisticated as celery. In particular, I'm not trying to set up an RPC and I don't need to read replies from the do_stuff routines.

EDIT 2: The print pattern that I expected would be the following. I do

python send.py first message......
python send.py second message.
python send.py third message.
python send.py fourth message.

and the print pattern would be

launch thread
start consuming
 [x] received 'first message......'
launch thread
start consuming
 [x] received 'second message.'
launch thread
start consuming
 [x] received 'third message.'
 [x] received 'fourth message.'

The problem is that you're making the thread daemonic:

td = Threaded_worker()
td.setDaemon(True)  # Shouldn't do that.
td.start()

Daemonic threads will be terminated as soon as the main thread exits:

A thread can be flagged as a “daemon thread”. The significance of this flag is that the entire Python program exits when only daemon threads are left. The initial value is inherited from the creating thread. The flag can be set through the daemon property.

Leave out setDaemon(True) and you should see it behave the way you expect.

Also, the pika FAQ has a note about how to use it with threads:

Pika does not have any notion of threading in the code. If you want to use Pika with threading, make sure you have a Pika connection per thread, created in that thread. It is not safe to share one Pika connection across threads.

This suggests you should move everything you're doing in __init__() into run() , so that the connection is created in the same thread you're actually consuming from the queue in.

链接地址: http://www.djcxy.com/p/46364.html

上一篇: RabbitMQ消费者设计多交换

下一篇: 在python线程中使用rabbitmq队列