RabbitMQ Consumer Design for Multiple Exchange

I have a RabbitMQ setup with following configuration.

  • each Exchange is FANOUT type
  • Multiple Queue attached to each Exchange.
  • BlockingConnection is made by consumer.
  • Single Consumer to handle all callbacks.
  • Problem -

    Some payload take longer time to process than others, which leads the consumer to stay idle even when there are payloads in other queue.

    Question -

  • How should I implement the consumer to avoid long waits ? Should I run separate consumer for each module ? any user experience ?
  • Can I configure RabbitMQ to handle these situations ? if so how.?

  • First it would be nice to know why do you have more than one fanout exchange? Do you really need this? Fanout exchange sends messages to all queues...

  • Just have more consumers. Check this example from rabbitmq tutorial.
  • You don't really need to configure rabbitmq explicitly, everything can be done with the clients (publishers and subscribers), you just need to figure out how many exchanges do you need and which type should they be etc.

  • First, what programming language are u using? Most common languages, such as python, java, c#, all support creating additional threads for parallel process.

    Let's say you consume the queue like below (pseu code):

    def callback(ch, method, properties, body) ...
    def threaded_function(ch, method, properties, body) ...
    
    channel.basic_qos(prefetch_count=3)
    channel.basic_consume(callback, queue='task_queue')
    channel.start_consuming()
    

    first, setting "prefetch_count=3" allows your consumer to have at-most 3 messages in not-ack status concurrently.

    In the callback method, you should start a thread for executing each message with threaded_function. At the end of the threaded_function method body, do:

    ch.basic_ack(delivery_tag = method.delivery_tag)
    

    so that, at-most 3 messages could be processed concurrently, even it takes longer time for one or two of the threads to run, the others could still process next messages.

    链接地址: http://www.djcxy.com/p/46366.html

    上一篇: Kombu + RabbitMQ:检查队列是否为空

    下一篇: RabbitMQ消费者设计多交换