没有订阅者的RabbitMQ队列

“持久”和“持久模式”似乎与重新启动有关,而不涉及没有订户接收消息。

当没有订户时,我希望RabbitMQ将消息保留在队列中。 当用户上线时,该用户应收到该消息。 RabbitMQ可以吗?

代码示例:

服务器:

namespace RabbitEg
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new RabbitMQ.Client.ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    //channel.ExchangeDelete(EXCHANGE_NAME);
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);
                    //channel.BasicReturn += new BasicReturnEventHandler(channel_BasicReturn);

                    for (int i = 0; i < 100; i++)
                    {
                        byte[] payLoad = Encoding.ASCII.GetBytes("hello world _ " + i);
                        IBasicProperties channelProps = channel.CreateBasicProperties();
                        channelProps.SetPersistent(true);

                        channel.BasicPublish(EXCHANGE_NAME, "routekey_helloworld", false, false, channelProps, payLoad);

                        Console.WriteLine("Sent Message " + i);
                        System.Threading.Thread.Sleep(25);
                    }

                    Console.ReadLine();
                }
            }
        }
    }
}

客户:

namespace RabbitListener
{
    class Program
    {
        private const string EXCHANGE_NAME = "helloworld";

        static void Main(string[] args)
        {
            ConnectionFactory cnFactory = new ConnectionFactory() { HostName = "localhost" };

            using (IConnection cn = cnFactory.CreateConnection())
            {
                using (IModel channel = cn.CreateModel())
                {
                    channel.ExchangeDeclare(EXCHANGE_NAME, "direct", true);

                    string queueName = channel.QueueDeclare("myQueue", true, false, false, null);
                    channel.QueueBind(queueName, EXCHANGE_NAME, "routekey_helloworld");

                    Console.WriteLine("Waiting for messages");

                    QueueingBasicConsumer consumer = new QueueingBasicConsumer(channel);
                    channel.BasicConsume(queueName, true, consumer);

                    while (true)
                    {
                        BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
                        Console.WriteLine(Encoding.ASCII.GetString(e.Body));
                    }
                }
            }
        }
    }
}

请参阅AMQP参考,了解durablepersistent均值的解释。

基本上,队列要么durable要么non-durable 。 前者经纪人重新生存,后者不。

消息以transientpersistent 。 这个想法是persistent的消息durable队列也应该生存代理重新启动。

所以,为了得到你想要的,你需要1)声明队列是durable ,2)将消息发布为persistent 。 另外,您可能还希望在频道上启用发布商确认; 这样,你就会知道经纪人何时承担了该消息的责任。

链接地址: http://www.djcxy.com/p/59231.html

上一篇: RabbitMQ Queue with no subscribers

下一篇: "utime" is not exported by the Time::HiRes module