RabbitMQ:与Topic交换的持久消息
我对RabbitMQ很新。
我已经建立了一个'话题'交流。 消费者可能会在发布商之后启动。 我希望消费者能够收到在发送之前发送的消息,并且这些消息尚未被消费。
交易所设置了以下参数:
exchange_type => 'topic'
durable => 1
auto_delete => 0
passive => 0
消息使用此参数发布:
delivery_mode => 2
消费者使用get()从交换中检索消息。
不幸的是,任何客户端出现之前发布的任何消息都会丢失。 我使用了不同的组合。
我想我的问题是交换不能保存消息。 也许我需要在发布者和队列之间有一个队列。 但是,这似乎不适用于通过密钥路由消息的“主题”交换。
任何想法我应该如何继续。 我使用Perl绑定Net :: RabbitMQ(应该没关系)和RabbitMQ 2.2.0。
如果没有可用的连接使用者在发布时处理消息,则需要一个持久队列来存储消息。
交换不存储消息,但是队列可以。 令人困惑的部分是,交流可以被标记为“耐用”,但所有的真正的意思是,该交易所本身仍然会在那里,如果你重新启动你的经纪人,但是这并不意味着发送到该交换的任何消息都自动持久。
鉴于此,这里有两个选择:
我会去#1。 可能不需要执行很多步骤,并且始终可以编写所需的步骤,以便可以重复这些步骤。 此外,如果所有消费者都将从同一个队列中抽取(而不是每个队列都有一个专用队列),那么这确实是最小的管理开销。
队列是可以正确管理和控制的东西。 否则,你可能会最终与流氓消费者宣布持久队列,使用他们几分钟,但永远不会。 不久之后,你将拥有一个永不停息的队伍,不会有任何缩小规模,以及即将发生的经纪人启示。
正如Brian所提到的,交换不存储消息,主要负责将消息路由到其他交换机或队列。 如果交换没有绑定到一个队列,那么发送到该交换的所有消息都将“丢失”
您不需要在发布者脚本中声明固定的客户端队列,因为这可能无法扩展。 队列可由发布者动态创建,并使用交换 - 交换绑定进行内部路由。
RabbitMQ支持交换到交换绑定,这将允许拓扑的灵活性,解耦和其他好处。 您可以在RabbitMQ Exchange to Exchange Bindings [AMPQ]
RabbitMQ Exchange交换绑定
Python代码示例,如果没有消费者使用队列,则使用持久性创建交换 - 交换绑定。
#!/usr/bin/env python
import pika
import sys
connection = pika.BlockingConnection(pika.ConnectionParameters(
host='localhost'))
channel = connection.channel()
#Declares the entry exchange to be used by all producers to send messages. Could be external producers as well
channel.exchange_declare(exchange='data_gateway',
exchange_type='fanout',
durable=True,
auto_delete=False)
#Declares the processing exchange to be used.Routes messages to various queues. For internal use only
channel.exchange_declare(exchange='data_distributor',
exchange_type='topic',
durable=True,
auto_delete=False)
#Binds the external/producer facing exchange to the internal exchange
channel.exchange_bind(destination='data_distributor',source='data_gateway')
##Create Durable Queues binded to the data_distributor exchange
channel.queue_declare(queue='trade_db',durable=True)
channel.queue_declare(queue='trade_stream_service',durable=True)
channel.queue_declare(queue='ticker_db',durable=True)
channel.queue_declare(queue='ticker_stream_service',durable=True)
channel.queue_declare(queue='orderbook_db',durable=True)
channel.queue_declare(queue='orderbook_stream_service',durable=True)
#Bind queues to exchanges and correct routing key. Allows for messages to be saved when no consumer is present
channel.queue_bind(queue='orderbook_db',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='orderbook_stream_service',exchange='data_distributor',routing_key='*.*.orderbook')
channel.queue_bind(queue='ticker_db',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='ticker_stream_service',exchange='data_distributor',routing_key='*.*.ticker')
channel.queue_bind(queue='trade_db',exchange='data_distributor',routing_key='*.*.trade')
channel.queue_bind(queue='trade_stream_service',exchange='data_distributor',routing_key='*.*.trade')
链接地址: http://www.djcxy.com/p/34203.html