RabbitMQ:与Topic交换的持久消息

我对RabbitMQ很新。

我已经建立了一个'话题'交流。 消费者可能会在发布商之后启动。 我希望消费者能够收到在发送之前发送的消息,并且这些消息尚未被消费。

交易所设置了以下参数:

exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0

消息使用此参数发布:

delivery_mode => 2

消费者使用get()从交换中检索消息。

不幸的是,任何客户端出现之前发布的任何消息都会丢失。 我使用了不同的组合。

我想我的问题是交换不能保存消息。 也许我需要在发布者和队列之间有一个队列。 但是,这似乎不适用于通过密钥路由消息的“主题”交换。

任何想法我应该如何继续。 我使用Perl绑定Net :: RabbitMQ(应该没关系)和RabbitMQ 2.2.0。


如果没有可用的连接使用者在发布时处理消息,则需要一个持久队列来存储消息。

交换不存储消息,但是队列可以。 令人困惑的部分是,交流可以被标记为“耐用”,但所有的真正的意思是,该交易所本身仍然会在那里,如果你重新启动你的经纪人,但是这并不意味着发送到该交换的任何消息都自动持久。

鉴于此,这里有两个选择:

  • 在启动发布者之前执行管理步骤以自行创建队列。 您可以使用Web UI或命令行工具来执行此操作。 确保将其创建为一个持久队列,以便即使没有活动使用者,它也会存储路由到它的任何消息。
  • 假设你的消费者被编码为在启动时总是声明(并因此自动创建)他们的交换和队列(并且他们声明它们是持久的), 那么在启动任何发布者之前至少运行一次所有消费者 。 这将确保您的所有队列被正确创建。 然后,您可以关闭消费者,直到真正需要消费者为止,因为队列会持续存储路由给他们的未来消息。
  • 我会去#1。 可能不需要执行很多步骤,并且始终可以编写所需的步骤,以便可以重复这些步骤。 此外,如果所有消费者都将从同一个队列中抽取(而不是每个队列都有一个专用队列),那么这确实是最小的管理开销。

    队列是可以正确管理和控制的东西。 否则,你可能会最终与流氓消费者宣布持久队列,使用他们几分钟,但永远不会。 不久之后,你将拥有一个永不停息的队伍,不会有任何缩小规模,以及即将发生的经纪人启示。


    正如Brian所提到的,交换不存储消息,主要负责将消息路由到其他交换机或队列。 如果交换没有绑定到一个队列,那么发送到该交换的所有消息都将“丢失”

    您不需要在发布者脚本中声明固定的客户端队列,因为这可能无法扩展。 队列可由发布者动态创建,并使用交换 - 交换绑定进行内部路由。

    RabbitMQ支持交换到交换绑定,这将允许拓扑的灵活性,解耦和其他好处。 您可以在RabbitMQ Exchange to Exchange Bindings [AMPQ]

    RabbitMQ Exchange交换绑定

    示例拓扑

    Python代码示例,如果没有消费者使用队列,则使用持久性创建交换 - 交换绑定。

    #!/usr/bin/env python
    import pika
    import sys
    
    
    connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost'))
    channel = connection.channel()
    
    
    #Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
    channel.exchange_declare(exchange='data_gateway',
    exchange_type='fanout',
    durable=True,
    auto_delete=False)
    
    #Declares the processing exchange to be used.Routes messages to various queues. For internal use only
    channel.exchange_declare(exchange='data_distributor',
    exchange_type='topic',
    durable=True,
    auto_delete=False)
    
    #Binds the external/producer facing exchange to the internal exchange
    channel.exchange_bind(destination='data_distributor',source='data_gateway')
    
    ##Create Durable Queues binded to the data_distributor exchange
    channel.queue_declare(queue='trade_db',durable=True)
    channel.queue_declare(queue='trade_stream_service',durable=True)
    channel.queue_declare(queue='ticker_db',durable=True)
    channel.queue_declare(queue='ticker_stream_service',durable=True)
    channel.queue_declare(queue='orderbook_db',durable=True)
    channel.queue_declare(queue='orderbook_stream_service',durable=True)
    
    #Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
    channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
    channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
    channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
    channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
    channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
    channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
    
    链接地址: http://www.djcxy.com/p/34203.html

    上一篇: RabbitMQ: persistent message with Topic exchange

    下一篇: AMQP 1.0 Library for Python