RabbitMQ + kombu:写入/读取到一个
我是新来的消息交换工作人员,并遇到问题找到适当的任务手册。
我需要组织队列以便:
生产者创建一些随机空队列,并在那里写下所有消息(通常为100条消息)。
消费者找到非空和非锁定的队列并从中读取,直到它为空,然后删除它并查找下一个队列。
所以我的任务是将消息作为包处理,我知道如何在一个队列中使用相同的密钥生成和使用,但无法找到如何处理队列池。
我们可以有几个生产者和消费者并行运行,但不管他们中的哪一个发送给谁。 我们不需要也不能将特定的生产者与特定的消费者联系起来。
一般任务:我们有很多的客户端接收推送通知,我们通过一些参数对推送进行分组来处理,所以这样的分组应该在RabbitMQ中的一个队列中作为一个组被生成和使用,但是每个组都是独立的来自其他团体。
非常感谢Hannu的帮助:他的简单而强大的解决方案的关键思想是我们可以拥有一个具有已知名称的持久队列,其中生产者将编写已创建队列的名称,并且消费者将从那里读取这些名称。
为了让他的解决方案更加易读并且易于在将来进行我的个人任务,我将producer中的publish_data()分成了两个函数 - 一个是随机队列并将其写入control_queue,另一个接收此random_queue并填充消息。 对于消费者来说,类似的思想是有好处的 - 一个函数用来处理队列,另一个函数本身就会被调用。
我做了这样的事情,但与皮卡。 我必须清理并清除以前的代码片段。 它可能不是非常kombuish(这是我写的绝对第一个代码片断),但这是我将如何解决它。 基本上我会建立一个已知名称的控制队列。
发布者将为一组消息创建一个随机队列名称,将N个消息转储到它(在我的情况下编号为1-42),然后将队列名称发布到控制队列。 消费者然后接收该队列名称,绑定到它,读取消息,直到队列为空,然后删除队列。
这使得事情变得相对简单,因为发布者不需要找出他们被允许发布他们的数据组的位置(每个队列都是随机的新名称)。 接收者无需担心超时或“全部完成” - 消息,因为只有当一组数据已写入队列并且每条消息都在等待时,接收者才会收到队列名称。
也没有必要修补锁或信号或其他任何会使事情复杂化的东西。 您可以拥有尽可能多的消费者和生产者。 当然,使用交换和路由键可能会有不同的消费者用于不同的任务等。
出版者
from kombu import Connection
import uuid
from time import sleep
def publish_data(conn):
random_name= "q" + str(uuid.uuid4()).replace("-", "")
random_queue = conn.SimpleQueue(random_name)
for i in xrange(0, 42):
random_queue.put(i)
random_queue.close()
return random_name
with Connection('amqp://guest:guest@localhost:5672//') as conn:
control_queue = conn.SimpleQueue('control_queue')
_a = 0
while True:
y_name = publish_data(conn)
message = y_name
control_queue.put(message)
print('Sent: {0}'.format(message))
_a += 1
sleep(0.3)
if _a > 20:
break
control_queue.close()
消费者
from Queue import Empty
from kombu import Connection, Queue
def process_msg(foo):
print str(foo)
with Connection("amqp://guest:guest@localhost:5672//") as _conn:
sub_queue = _conn.SimpleQueue(str(foo))
while True:
try:
_msg = sub_queue.get(block=False)
print _msg.payload
_msg.ack()
except Empty:
break
sub_queue.close()
chan = _conn.channel()
dq = Queue(name=str(foo), exchange="")
bdq = dq(chan)
bdq.delete()
with Connection('amqp://guest:guest@localhost:5672//') as conn:
rec = conn.SimpleQueue('control_queue')
while True:
msg = rec.get(block=True)
entry = msg.payload
msg.ack()
process_msg(entry)
链接地址: http://www.djcxy.com/p/46369.html