在python线程中使用rabbitmq队列

这是一个很长的。

我有一个用户名和密码的列表。 对于每一个我想登录帐户和做一些事情。 我想用几台机器来更快地完成这个任务。 我想这样做的方式是有一台主机,其工作只是有一个cron,它不时检查rabbitmq队列是否为空。 如果是,请从文件中读取用户名和密码列表并将其发送到rabbitmq队列。 然后有一堆机器订购了那个工作正在接收用户/通行证的队列,在它上面做东西,确认它,然后继续下一个,直到队列为空,然后主机将它填满再次。 到目前为止,我认为我已经把所有东西都弄倒

现在出现我的问题。 我已经检查过,每个用户/通行证要做的事情并不是很密集,所以我可以让每台机器使用python的线程同时执行其中的三个。 实际上,对于单个机器,我已经实现了这一点,我将用户/通道加载到python Queue()中,然后让三个线程使用Queue()。 现在我想做类似的事情,但不是从python Queue()中消耗,每台机器的每个线程都应该从rabbitmq队列中消耗。 这是我卡住的地方。 要运行测试,我使用rabbitmq的教程开始。

send.py:

import pika, sys
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

message = ' '.join(sys.argv[1:])
channel.basic_publish(exchange='',
                      routing_key='hello',
                      body=message)
connection.close()

worker.py

import time, pika

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')

def callback(ch, method, properties, body):
    print ' [x] received %r' % (body,)
    time.sleep( body.count('.') )
    ch.basic_ack(delivery_tag = method.delivery_tag)

channel.basic_qos(prefetch_count=1)
channel.basic_consume(callback, queue='hello', no_ack=False)
channel.start_consuming()

对于上述,您可以运行两个worker.py,它将订阅rabbitmq队列并按预期使用。

我没有rabbitmq的线程是这样的:

runit.py

class Threaded_do_stuff(threading.Thread):
    def __init__(self, user_queue):
        threading.Thread.__init__(self)
        self.user_queue = user_queue

    def run(self):
        while True:
            login = self.user_queue.get()
            do_stuff(user=login[0], pass=login[1])
            self.user_queue.task_done()

user_queue = Queue.Queue()
for i in range(3):
    td = Threaded_do_stuff(user_queue)
    td.setDaemon(True)
    td.start()

## fill up the queue
for user in list_users:
    user_queue.put(user)

## go!
user_queue.join()

这也按预期工作:填满队列并有3个线程订阅它。 现在我想要做的就是像runit.py,但不是使用python Queue(),而是使用像worker.py这样的实际上是一个rabbitmq队列的东西。

这是我尝试过但没有用的东西(我不明白为什么)

rabbitmq_runit.py

import time, threading, pika

class Threaded_worker(threading.Thread):
    def callback(self, ch, method, properties, body):
        print ' [x] received %r' % (body,)
        time.sleep( body.count('.') )
        ch.basic_ack(delivery_tag = method.delivery_tag)

    def __init__(self):
        threading.Thread.__init__(self)
        self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
        self.channel = self.connection.channel()
        self.channel.queue_declare(queue='hello')
        self.channel.basic_qos(prefetch_count=1)
        self.channel.basic_consume(self.callback, queue='hello')

    def run(self):
        print 'start consuming'
        self.channel.start_consuming()

for _ in range(3):
    print 'launch thread'
    td = Threaded_worker()
    td.setDaemon(True)
    td.start()

我希望这会启动三个线程,每个线程都被.start_consuming()阻塞,它只是在那里等待rabbitmq队列发送它们。 相反,该程序启动,进行一些打印并退出。 存在的模式也很奇怪:

launch thread
launch thread
start consuming
launch thread
start consuming

特别要注意的是缺少一个“开始消费”。

这是怎么回事?

编辑:我发现一个类似的问题的答案是在这里消费与多线程(Python Kombu)rabbitmq消息队列和答案是“使用芹菜”,无论这意味着什么。 我不买它,我不应该需要任何像芹菜一样复杂的东西。 特别是,我不想设置RPC,也不需要读取do_stuff例程的回复。

编辑2:我期望的打印模式将是以下。 我做

python send.py first message......
python send.py second message.
python send.py third message.
python send.py fourth message.

并且打印模式会是

launch thread
start consuming
 [x] received 'first message......'
launch thread
start consuming
 [x] received 'second message.'
launch thread
start consuming
 [x] received 'third message.'
 [x] received 'fourth message.'

问题是你正在使线程守护进程:

td = Threaded_worker()
td.setDaemon(True)  # Shouldn't do that.
td.start()

一旦主线程退出,守护线程将立即终止:

线程可以被标记为“守护线程”。 这个标志的意义在于,只有守护进程线程退出时,整个Python程序才会退出。 初始值是从创建线程继承的。 该标志可以通过守护进程属性设置。

setDaemon(True) ,你会发现它的行为和你期望的一样。

此外,pika常见问题解答还提供了有关如何在线程中使用它的说明:

Pika没有任何线程代码的概念。 如果你想在线程中使用Pika,确保你在该线程中创建了每个线程的Pika连接。 在线程间共享一个Pika连接是不安全的。

这意味着你应该将__init__()所做的所有事情都移动到run() ,以便连接在你实际从队列中消耗的同一个线程中创建。

链接地址: http://www.djcxy.com/p/46363.html

上一篇: Consuming rabbitmq queue from inside python threads

下一篇: Python Kombu