RabbitMQ / AMQP:单个队列,同一消息的多个消费者?

一般来说,我刚开始使用RabbitMQ和AMQP。

  • 我有一个消息队列
  • 我有多个消费者,我想用同样的信息做不同的事情。
  • 大部分RabbitMQ文档似乎都侧重于循环法,即单个消费者使用单个消息的情况下,负载分散在每个消费者之间。 这确实是我所见证的行为。

    举个例子:生产者有一个队列,并且每2秒发送一次消息:

    var amqp = require('amqp');
    var connection = amqp.createConnection({ host: "localhost", port: 5672 });
    var count = 1;
    
    connection.on('ready', function () {
      var sendMessage = function(connection, queue_name, payload) {
        var encoded_payload = JSON.stringify(payload);  
        connection.publish(queue_name, encoded_payload);
      }
    
      setInterval( function() {    
        var test_message = 'TEST '+count
        sendMessage(connection, "my_queue_name", test_message)  
        count += 1;
      }, 2000) 
    
    
    })
    

    这里有一位消费者:

    var amqp = require('amqp');
    var connection = amqp.createConnection({ host: "localhost", port: 5672 });
    connection.on('ready', function () {
      connection.queue("my_queue_name", function(queue){
        queue.bind('#'); 
        queue.subscribe(function (message) {
          var encoded_payload = unescape(message.data)
          var payload = JSON.parse(encoded_payload)
          console.log('Recieved a message:')
          console.log(payload)
        })
      })
    })
    

    如果我启动消费者两次, 我可以看到每个消费者正在消费轮流行为中的备用消息。 例如,我会在一个终端看到消息1,3,5,在另一个终端看到消息2,4,6

    我的问题是:

  • 我可以让每位消费者收到相同的消息吗? 也就是说,两个消费者都会得到消息1,2,3,4,5,6? 这在AMQP / RabbitMQ中称为什么? 它是如何配置的?

  • 这通常是否完成? 我是否应该将消息交换路线分成两个单独的队列,而不是单个消费者?


  • 我可以让每位消费者收到相同的消息吗? 也就是说,两个消费者都会得到消息1,2,3,4,5,6? 这在AMQP / RabbitMQ中称为什么? 它是如何配置的?

    不,如果消费者在同一队列中,则不行。 来自RabbitMQ的AMQP概念指南:

    重要的是要明白,在AMQP 0-9-1中,消息之间的消息负载均衡。

    这似乎意味着队列中的循环行为是给定的 ,而且是不可配置的。 也就是说,为了让多个消费者处理相同的消息ID,需要单独的队列。

    这通常是否完成? 我是否应该将消息交换路线分成两个单独的队列,而不是单个消费者?

    不,它不是,单个队列/多个消费者与每个消费者处理相同的消息ID是不可能的。 通过交换路由将消息分成两个独立的队列确实更好。

    由于我不需要太复杂的路由, 扇出交换将很好地处理这个问题。 由于node-amqp具有“默认交换”的概念,因此您可以直接将消息发布到连接,但是大多数AMQP消息都已发布到特定的交换机上,所以我之前没有将注意力放在Exchange上。

    这是我的粉丝交流,发送和接收:

    var amqp = require('amqp');
    var connection = amqp.createConnection({ host: "localhost", port: 5672 });
    var count = 1;
    
    connection.on('ready', function () {
      connection.exchange("my_exchange", options={type:'fanout'}, function(exchange) {   
    
        var sendMessage = function(exchange, payload) {
          console.log('about to publish')
          var encoded_payload = JSON.stringify(payload);
          exchange.publish('', encoded_payload, {})
        }
    
        // Recieve messages
        connection.queue("my_queue_name", function(queue){
          console.log('Created queue')
          queue.bind(exchange, ''); 
          queue.subscribe(function (message) {
            console.log('subscribed to queue')
            var encoded_payload = unescape(message.data)
            var payload = JSON.parse(encoded_payload)
            console.log('Recieved a message:')
            console.log(payload)
          })
        })
    
        setInterval( function() {    
          var test_message = 'TEST '+count
          sendMessage(exchange, test_message)  
          count += 1;
        }, 2000) 
     })
    })
    

    只需阅读rabbitmq教程。 你发布消息交换,而不是排队; 然后它被路由到适当的队列。 在你的情况下,你应该为每个消费者绑定单独的队列。 这样,他们可以完全独立地使用消息。


    最后几个答案几乎是正确的 - 我有大量的应用程序生成消息,最终需要不同的消费者,所以这个过程非常简单。

    如果您希望多个消费者使用相同的消息,请执行以下步骤。

    在每个队列属性中为每个要接收消息的应用程序创建多个队列,并使用amq.direct交换“绑定”一个路由标记。 更改发布应用以发送到amq.direct并使用路由标记(不是队列)。 然后AMQP将使用相同的绑定将消息复制到每个队列中。 作品魅力:)

    示例:假设我有一个生成的JSON字符串,我使用路由标记“new-sales-order”将它发布到“amq.direct”交换中,我有一个用于列印订单的order_printer应用程序队列,我有一个排队等候我的结算系统,它会发送一份订单副本并开具客户账单,并且我有一个网络归档系统,用于归档历史/合规性原因的订单,并且我有一个客户网络界面,跟踪其他信息的订单。订单。

    所以我的队列是:order_printer,order_billing,order_archive和order_tracking所有绑定标签“new-sales-order”绑定到它们,所有4个将获得JSON数据。

    这是发送数据的理想方式,无需发布应用程序知道或关心接收应用程序。

    链接地址: http://www.djcxy.com/p/34067.html

    上一篇: RabbitMQ / AMQP: single queue, multiple consumers for same message?

    下一篇: Implementing message priority in AMQP