反应式扩展:批量处理事件+在每批之间添加延迟

我有一个应用程序,它在某些时候几乎同时提出了1000个事件。 我想要做的是将事件批量分成50个项目,并开始每10秒处理一次。 在开始新的批处理之前,不需要等待批量完成。

例如:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

任何想法如何实现这一目标? 我认为Reactive Extensions是可行的,但其他解决方案也是可以接受的。

我试着从这里开始:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

但注意到拖延并没有按照我所希望的那样工作,而是所有批次都同时开始,但延迟了5秒。

我也测试了Window方法,但我没有注意到行为上的任何差异。 我猜想Window中的TimeSpan实际上意味着“在未来10秒内发生每一件事情:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

我使用Rx-Main 2.0.20304-beta。


如果你不想睡觉线程,你可以这样做:

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);

尝试这个:

    var bufferedItems = eventAsObservable
        .Buffer(50)
        .Do(_ => { Thread.Sleep(10000); });

有一个特定的缓冲区方法重载只是为此:https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

observable.Buffer(TimeSpan.FromSeconds(5), 50);
链接地址: http://www.djcxy.com/p/67311.html

上一篇: Reactive Extensions: Process events in batches + add delay between every batch

下一篇: Is it possible to Observable.Buffer on something other than time