How to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?

Is there any way to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?

I need to guarantee in-order processing and acking of messages coming from a system that does not publish in EasyNetQ format. I know the consumer runs on a single thread, but the IAdvancedBus interface only offers one method to consume raw messages:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

The Task return type means that the consumer is running the callback asynchronously and hence may process the messages out of order.

If not, any ideas for changing the code to support this? I would make the interface method:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);

and implement it in RabbitAdvancedBus , but I am not sure where the code would go exactly.


This is an interesting question. I'm not a EasyNetQ expert myself, and perhaps someone else will come along and give you a better answer. However I've been familiar with the EasyNetQ code base for about a year, and in my opinion it is tricky to get a sense of what's happening when wiring up a consumer (and hence when the consumer is being invoked).

I would first like to point out that just by changing the signature of the method, it is not guaranteed that the messages are processed in order. Look for example at this implementation of your suggested interface:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
    {
        onMessage(bytes, properties, info);
        return new Task(() => { });
    };
    Consume(queue, taskWrapper);
}

It calls the original Consume method, and we don't really know what happens after that, right?

If I where In your shoes I would do one of the following things:

  • Use the Official RabbitMq Client and consume the messages form there (it is not that tricky!)
  • Maybe have a look at RawRabbit, a thin layer above RabbitMq that I've been contributing to (using vNext standards). It only supports async signatures for consuming messages but it shouldn't be to difficult to write a synchronious implementation of Subscriber.cs (using a sync library like AsyncEx).
  • Change modelling of business logic. I'm not sure if this is applicable in your case, but in general if it is mission critical that every message is processed in correct order, you should have it modeled in a way so that the consume method can verify that this message is next in line. (besides, I don't think that EasyNetQ guarantees message sequences so you would probably want to verify it for each new version of the framework).
  • Hope this helps!


    I received a response that works in the EasyNetQ Google Group:

    To execute synchronously you can do this:

    bus.Advanced.Consume(queue, (bytes, properties, info) =>
    {
        // do your synchronous work.....
        return Task.CompletedTask;
    });
    

    or add an extension:

    using System;
    using System.Threading.Tasks;
    using EasyNetQ;
    using EasyNetQ.Consumer;
    using EasyNetQ.Loggers;
    using EasyNetQ.Topology;
    
    namespace ConsoleApplication4
    {
        public static class RabbitAdvancedBusConsumeExtension
        {
           public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
        {
            return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
        }
    
        public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
        {
            return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
        }
    
        private static Task ExecuteSynchronously(Action action)
        {
            var tcs = new TaskCompletionSource<object>();
            try
            {
                action();
                tcs.SetResult(null);
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
            return tcs.Task;
        }
    }
    
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
    
            var queue = bus.Advanced.QueueDeclare();
            bus.Advanced.Consume(queue, (bytes, properties, info) =>
            {
                // .....
            });
        }
    }
    }
    

    UPDATE: This functionality was added in release 0.52.0.410:

    https://github.com/EasyNetQ/EasyNetQ/pull/505

    链接地址: http://www.djcxy.com/p/34210.html

    上一篇: AMQP或HTTP

    下一篇: 如何使用EasyNetQ同步消耗来自RabbitMQ的原始字节消息?