RabbitMQ消费者设计多交换

我有以下配置的RabbitMQ设置。

  • 每个交易所都是FANOUT类型
  • 多个队列附加到每个Exchange。
  • BlockingConnection由消费者制作。
  • 单一消费者处理所有回调。
  • 问题 -

    某些有效负载比其他有效负载需要更长的时间来处理,这导致消费者即使在其他队列中有有效负载时也保持闲置状态。

    题 -

  • 我应该如何实施消费者以避免长时间等待? 我应该为每个模块运行单独的消费者吗? 任何用户体验?
  • 我可以配置RabbitMQ来处理这些情况吗? 如果是这样的话?

  • 首先,很高兴知道为什么你有一个以上的扇出交换? 你真的需要这个吗? 扇出交换向所有队列发送消息...

  • 只有更多的消费者。 从rabbitmq教程中查看这个例子。
  • 你并不需要明确地配置rabbitmq,所有的事情都可以由客户端(发布者和订阅者)来完成,你只需要弄清楚你需要多少交换机以及它们应该是哪种类型等等。

  • 首先,你使用什么编程语言? 最常见的语言,如python,java,c#,都支持为并行进程创建额外的线程。

    假设你使用下面的队列(pseu代码):

    def callback(ch, method, properties, body) ...
    def threaded_function(ch, method, properties, body) ...
    
    channel.basic_qos(prefetch_count=3)
    channel.basic_consume(callback, queue='task_queue')
    channel.start_consuming()
    

    首先,设置“prefetch_count = 3”允许您的消费者最多同时有3条消息处于非确认状态。

    在回调方法中,您应该启动一个线程来执行带有threaded_function的每条消息。 在threaded_function方法体的末尾,执行:

    ch.basic_ack(delivery_tag = method.delivery_tag)
    

    因此,最多可以同时处理3个消息,即使需要更长的时间才能运行一个或两个线程,其他线程仍可以处理下一个消息。

    链接地址: http://www.djcxy.com/p/46365.html

    上一篇: RabbitMQ Consumer Design for Multiple Exchange

    下一篇: Consuming rabbitmq queue from inside python threads