RabbitMQ Consumer Design for Multiple Exchange
I have a RabbitMQ setup with following configuration.
FANOUT
type BlockingConnection
is made by consumer. Problem -
Some payload take longer time to process than others, which leads the consumer to stay idle even when there are payloads in other queue.
Question -
First it would be nice to know why do you have more than one fanout exchange? Do you really need this? Fanout exchange sends messages to all queues...
First, what programming language are u using? Most common languages, such as python, java, c#, all support creating additional threads for parallel process.
Let's say you consume the queue like below (pseu code):
def callback(ch, method, properties, body) ...
def threaded_function(ch, method, properties, body) ...
channel.basic_qos(prefetch_count=3)
channel.basic_consume(callback, queue='task_queue')
channel.start_consuming()
first, setting "prefetch_count=3" allows your consumer to have at-most 3 messages in not-ack status concurrently.
In the callback method, you should start a thread for executing each message with threaded_function. At the end of the threaded_function method body, do:
ch.basic_ack(delivery_tag = method.delivery_tag)
so that, at-most 3 messages could be processed concurrently, even it takes longer time for one or two of the threads to run, the others could still process next messages.
链接地址: http://www.djcxy.com/p/46366.html上一篇: Kombu + RabbitMQ:检查队列是否为空
下一篇: RabbitMQ消费者设计多交换