Reactive Extensions: Process events in batches + add delay between every batch

I have an application which at some points raises 1000 events almost at the same time. What I would like to do is to batch the events to chunks of 50 items and start processing them every 10 seconds. There's no need to wait for a batch to complete before starting a new batch processing.

For example:

10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))

Any ideas how to achieve this? I suppose Reactive Extensions is the way to go but other solutions are acceptable too.

I tried to start from here:

        var bufferedItems = eventAsObservable
            .Buffer(15)
            .Delay(TimeSpan.FromSeconds(5)

But noticed that the delay didn't work as I hoped for and instead all the batches started simultaneously, though 5 seconds delayed.

I also tested the Window-method, but I didn't notice any difference in behavior. I suppose the TimeSpan in Window actually means that "take every event which happens in the next 10 seconds:

        var bufferedItems = eventAsObservable
            .Window(TimeSpan.FromSeconds(10), 5)
            .SelectMany(x => x)
            .Subscribe(DoProcessing);

I'm using the Rx-Main 2.0.20304-beta.


如果你不想睡觉线程,你可以这样做:

var tick = Observable.Interval(TimeSpan.FromSeconds(5));

eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);

尝试这个:

    var bufferedItems = eventAsObservable
        .Buffer(50)
        .Do(_ => { Thread.Sleep(10000); });

有一个特定的缓冲区方法重载只是为此:https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx

observable.Buffer(TimeSpan.FromSeconds(5), 50);
链接地址: http://www.djcxy.com/p/67312.html

上一篇: 如何根据时间和计数进行缓冲,但如果没有事件发生,则停止定时器

下一篇: 反应式扩展:批量处理事件+在每批之间添加延迟