How to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?
Is there any way to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?
I need to guarantee in-order processing and acking of messages coming from a system that does not publish in EasyNetQ format. I know the consumer runs on a single thread, but the IAdvancedBus
interface only offers one method to consume raw messages:
IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
The Task
return type means that the consumer is running the callback asynchronously and hence may process the messages out of order.
If not, any ideas for changing the code to support this? I would make the interface method:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);
and implement it in RabbitAdvancedBus
, but I am not sure where the code would go exactly.
This is an interesting question. I'm not a EasyNetQ expert myself, and perhaps someone else will come along and give you a better answer. However I've been familiar with the EasyNetQ code base for about a year, and in my opinion it is tricky to get a sense of what's happening when wiring up a consumer (and hence when the consumer is being invoked).
I would first like to point out that just by changing the signature of the method, it is not guaranteed that the messages are processed in order. Look for example at this implementation of your suggested interface:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
{
onMessage(bytes, properties, info);
return new Task(() => { });
};
Consume(queue, taskWrapper);
}
It calls the original Consume
method, and we don't really know what happens after that, right?
If I where In your shoes I would do one of the following things:
Subscriber.cs
(using a sync library like AsyncEx). Hope this helps!
I received a response that works in the EasyNetQ Google Group:
To execute synchronously you can do this:
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// do your synchronous work.....
return Task.CompletedTask;
});
or add an extension:
using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;
namespace ConsoleApplication4
{
public static class RabbitAdvancedBusConsumeExtension
{
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
}
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
}
private static Task ExecuteSynchronously(Action action)
{
var tcs = new TaskCompletionSource<object>();
try
{
action();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
return tcs.Task;
}
}
class Program
{
static void Main(string[] args)
{
var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
var queue = bus.Advanced.QueueDeclare();
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// .....
});
}
}
}
UPDATE: This functionality was added in release 0.52.0.410:
https://github.com/EasyNetQ/EasyNetQ/pull/505
链接地址: http://www.djcxy.com/p/34210.html上一篇: AMQP或HTTP