Is it possible to Observable.Buffer on something other than time

I've been looking for examples on how to use Observable.Buffer in rx but can't find anything more substantial than boiler plate time buffered stuff.

There does seem to be an overload to specify a "bufferClosingSelector" but I can't wrap my mind around it.

What I'm trying to do is create a sequence that buffers by time or by an "accumulation". Consider a request stream where every request has some sort of weight to it and I do not want to process more than x accumulated weight at a time, or if not enough has accumulated just give me what has come in the last timeframe(regular Buffer functionality)


bufferClosingSelector is a function called every time to get an Observable which will produce a value when the buffer is expected to be closed.

For example,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1))) works like the regular Buffer(time) overload.

In you want to weight a sequence, you can apply a Scan over the sequence and then decide on your aggregating condition.

Eg, source.Scan((a,c) => a + c).SkipWhile(a => a < 100) gives you a sequence which produces a value when the source sequence has added up to more than 100.

You can use Amb to race these two closing conditions to see which reacts first:

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )

You can use any series of combinators which produces any value for the buffer to be closed at that point.

Note: The value given to the closing selector doesn't matter - it's the notification that matters. So to combine sources of different types with Amb simply change it to System.Reactive.Unit .

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
链接地址: http://www.djcxy.com/p/67310.html

上一篇: 反应式扩展:批量处理事件+在每批之间添加延迟

下一篇: Observable.Buffer可以在时间以外的其他地方使用吗?