RabbitMQ消费者设计多交换
我有以下配置的RabbitMQ设置。
FANOUT
类型 BlockingConnection
由消费者制作。 问题 -
某些有效负载比其他有效负载需要更长的时间来处理,这导致消费者即使在其他队列中有有效负载时也保持闲置状态。
题 -
首先,很高兴知道为什么你有一个以上的扇出交换? 你真的需要这个吗? 扇出交换向所有队列发送消息...
首先,你使用什么编程语言? 最常见的语言,如python,java,c#,都支持为并行进程创建额外的线程。
假设你使用下面的队列(pseu代码):
def callback(ch, method, properties, body) ...
def threaded_function(ch, method, properties, body) ...
channel.basic_qos(prefetch_count=3)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
首先,设置“prefetch_count = 3”允许您的消费者最多同时有3条消息处于非确认状态。
在回调方法中,您应该启动一个线程来执行带有threaded_function的每条消息。 在threaded_function方法体的末尾,执行:
ch.basic_ack(delivery_tag = method.delivery_tag)
因此,最多可以同时处理3个消息,即使需要更长的时间才能运行一个或两个线程,其他线程仍可以处理下一个消息。
链接地址: http://www.djcxy.com/p/46365.html