Reactive Extensions: Process events in batches + add delay between every batch
I have an application which at some points raises 1000 events almost at the same time. What I would like to do is to batch the events to chunks of 50 items and start processing them every 10 seconds. There's no need to wait for a batch to complete before starting a new batch processing.
For example:
10:00:00: 10000 new events received
10:00:00: StartProcessing (events.Take(50))
10:00:10: StartProcessing (events.Skip(50).Take(50))
10:00:15: StartProcessing (events.Skip(100).Take(50))
Any ideas how to achieve this? I suppose Reactive Extensions is the way to go but other solutions are acceptable too.
I tried to start from here:
var bufferedItems = eventAsObservable
.Buffer(15)
.Delay(TimeSpan.FromSeconds(5)
But noticed that the delay didn't work as I hoped for and instead all the batches started simultaneously, though 5 seconds delayed.
I also tested the Window-method, but I didn't notice any difference in behavior. I suppose the TimeSpan in Window actually means that "take every event which happens in the next 10 seconds:
var bufferedItems = eventAsObservable
.Window(TimeSpan.FromSeconds(10), 5)
.SelectMany(x => x)
.Subscribe(DoProcessing);
I'm using the Rx-Main 2.0.20304-beta.
如果你不想睡觉线程,你可以这样做:
var tick = Observable.Interval(TimeSpan.FromSeconds(5));
eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);
尝试这个:
var bufferedItems = eventAsObservable
.Buffer(50)
.Do(_ => { Thread.Sleep(10000); });
有一个特定的缓冲区方法重载只是为此:https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx
observable.Buffer(TimeSpan.FromSeconds(5), 50);
链接地址: http://www.djcxy.com/p/67312.html