如何使用EasyNetQ同步消耗来自RabbitMQ的原始字节消息?
是否有任何方法可以同步使用EasyNetQ从RabbitMQ消耗原始字节消息?
我需要保证按顺序处理和确认来自未以EasyNetQ格式发布的系统的消息。 我知道消费者在单个线程上运行,但IAdvancedBus
接口只提供一种消耗原始消息的方法:
IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);
Task
返回类型意味着消费者正在异步运行回调,因此可能会按顺序处理消息。
如果没有,更改代码以支持这个想法的任何想法? 我会做接口方法:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);
并在RabbitAdvancedBus
实现它,但我不确定代码的准确位置。
这是个有趣的问题。 我本人不是EasyNetQ专家,也许别人会来,并给你一个更好的答案。 然而,我已经熟悉了EasyNetQ代码库大约一年,并且在我看来,了解接线消费者(以及因此调用消费者时)会发生什么情况是非常棘手的。
我首先想指出的是,仅仅通过更改方法的签名,并不能保证消息按顺序处理。 查看您的建议界面的实施示例:
IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
{
onMessage(bytes, properties, info);
return new Task(() => { });
};
Consume(queue, taskWrapper);
}
它调用原始的Consume
方法,我们并不真正知道之后会发生什么,对吧?
如果我在哪里在你的鞋子里,我会做以下事情之一:
Subscriber.cs
的同步实现(使用像AsyncEx这样的同步库)不应该很困难。 希望这可以帮助!
我收到了适用于EasyNetQ Google Group的回复:
要同步执行,你可以这样做:
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// do your synchronous work.....
return Task.CompletedTask;
});
或添加一个扩展名:
using System;
using System.Threading.Tasks;
using EasyNetQ;
using EasyNetQ.Consumer;
using EasyNetQ.Loggers;
using EasyNetQ.Topology;
namespace ConsoleApplication4
{
public static class RabbitAdvancedBusConsumeExtension
{
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
}
public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
{
return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
}
private static Task ExecuteSynchronously(Action action)
{
var tcs = new TaskCompletionSource<object>();
try
{
action();
tcs.SetResult(null);
}
catch (Exception e)
{
tcs.SetException(e);
}
return tcs.Task;
}
}
class Program
{
static void Main(string[] args)
{
var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
var queue = bus.Advanced.QueueDeclare();
bus.Advanced.Consume(queue, (bytes, properties, info) =>
{
// .....
});
}
}
}
更新:此功能在版本0.52.0.410中添加:
https://github.com/EasyNetQ/EasyNetQ/pull/505
链接地址: http://www.djcxy.com/p/34209.html上一篇: How to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?