如何使用EasyNetQ同步消耗来自RabbitMQ的原始字节消息?

是否有任何方法可以同步使用EasyNetQ从RabbitMQ消耗原始字节消息?

我需要保证按顺序处理和确认来自未以EasyNetQ格式发布的系统的消息。 我知道消费者在单个线程上运行,但IAdvancedBus接口只提供一种消耗原始消息的方法:

IDisposable Consume(IQueue queue, Func<byte[], MessageProperties, MessageReceivedInfo, Task> onMessage);

Task返回类型意味着消费者正在异步运行回调,因此可能会按顺序处理消息。

如果没有,更改代码以支持这个想法的任何想法? 我会做接口方法:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage);

并在RabbitAdvancedBus实现它,但我不确定代码的准确位置。


这是个有趣的问题。 我本人不是EasyNetQ专家,也许别人会来,并给你一个更好的答案。 然而,我已经熟悉了EasyNetQ代码库大约一年,并且在我看来,了解接线消费者(以及因此调用消费者时)会发生什么情况是非常棘手的。

我首先想指出的是,仅仅通过更改方法的签名,并不能保证消息按顺序处理。 查看您的建议界面的实施示例:

IDisposable Consume(IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
{
    Func<byte[], MessageProperties, MessageReceivedInfo, Task> taskWrapper = (bytes, properties, info) =>
    {
        onMessage(bytes, properties, info);
        return new Task(() => { });
    };
    Consume(queue, taskWrapper);
}

它调用原始的Consume方法,我们并不真正知道之后会发生什么,对吧?

如果我在哪里在你的鞋子里,我会做以下事情之一:

  • 使用官方的RabbitMq客户端并使用那里的消息(这不是那么棘手!)
  • 也许看看RawRabbit,这是RabbitMq上面的一个薄层,我一直致力于(使用vNext标准)。 它仅支持消费消息的异步签名,但编写Subscriber.cs的同步实现(使用像AsyncEx这样的同步库)不应该很困难。
  • 更改业务逻辑的建模。 我不确定这是否适用于您的情况,但通常情况下,如果每个消息都按正确的顺序处理是关键任务,则应该以某种方式对其进行建模,以便消耗方法可以验证此消息是否接下来一致。 (另外,我不认为EasyNetQ保证消息序列,所以你可能想要为每个新版本的框架验证它)。
  • 希望这可以帮助!


    我收到了适用于EasyNetQ Google Group的回复:

    要同步执行,你可以这样做:

    bus.Advanced.Consume(queue, (bytes, properties, info) =>
    {
        // do your synchronous work.....
        return Task.CompletedTask;
    });
    

    或添加一个扩展名:

    using System;
    using System.Threading.Tasks;
    using EasyNetQ;
    using EasyNetQ.Consumer;
    using EasyNetQ.Loggers;
    using EasyNetQ.Topology;
    
    namespace ConsoleApplication4
    {
        public static class RabbitAdvancedBusConsumeExtension
        {
           public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage)
        {
            return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)));
        }
    
        public static IDisposable Consume(this IAdvancedBus bus, IQueue queue, Action<byte[], MessageProperties, MessageReceivedInfo> onMessage, Action<IConsumerConfiguration> configure)
        {
            return bus.Consume(queue, (bytes, properties, info) => ExecuteSynchronously(() => onMessage(bytes, properties, info)), configure);
        }
    
        private static Task ExecuteSynchronously(Action action)
        {
            var tcs = new TaskCompletionSource<object>();
            try
            {
                action();
                tcs.SetResult(null);
            }
            catch (Exception e)
            {
                tcs.SetException(e);
            }
            return tcs.Task;
        }
    }
    
    class Program
    {
        static void Main(string[] args)
        {
            var bus = RabbitHutch.CreateBus("host=localhost", x => x.Register<IEasyNetQLogger>(s => new ConsoleLogger()));
    
            var queue = bus.Advanced.QueueDeclare();
            bus.Advanced.Consume(queue, (bytes, properties, info) =>
            {
                // .....
            });
        }
    }
    }
    

    更新:此功能在版本0.52.0.410中添加:

    https://github.com/EasyNetQ/EasyNetQ/pull/505

    链接地址: http://www.djcxy.com/p/34209.html

    上一篇: How to synchronously consume raw byte messages from RabbitMQ using EasyNetQ?

    下一篇: Does EasyNetQ support nack?