How to buffer based on time and count, but stopping the timer if no events occur
I'm producing a sequence of 50 items each tree seconds. I then want to batch them at max 20 items, but also not waiting more than one second before I release the buffer.
That works great!
But since the interval never dies, Buffer keeps firing empty batch chunks...
How can I avoid that? Shure Where(buf => buf.Count > 0)
should help - but that seems like a hack.
Observable
.Interval(TimeSpan.FromSeconds(3))
.Select(n => Observable.Repeat(n, 50))
.Merge()
.Buffer(TimeSpan.FromSeconds(1), 20)
.Subscribe(e => Console.WriteLine(e.Count));
Output:
0-0-0-20-20-10-0-20-20-10-0-0-20-20
The Where
filter you propose is a sound approach, I'd go with that.
You could wrap the Buffer
and Where
into a single helper method named to make the intent clearer perhaps, but rest assured the Where
clause is idiomatic Rx in this scenario.
Think of it this way; an empty Buffer is relaying information that no events occurred in the last second. While you can argue that this is implicit, it would require extra work to detect this if Buffer
didn't emit an empty list. It just so happens it's not information you are interested in - so Where
is an appropriate way to filter this information out.
A lazy timer solution
Following from your comment ("...the timer... be[ing] lazily initiated...") you can do this to create a lazy timer and omit the zero counts:
var source = Observable.Interval(TimeSpan.FromSeconds(3))
.Select(n => Observable.Repeat(n, 50))
.Merge();
var xs = source.Publish(pub =>
pub.Buffer(() => pub.Take(1).Delay(TimeSpan.FromSeconds(1))
.Merge(pub.Skip(19)).Take(1)));
xs.Subscribe(x => Console.WriteLine(x.Count));
Explanation
Publishing
This query requires subscribing to the source events multiple times. To avoid unexpected side-effects, we use Publish
to give us pub
which is a stream that multicasts the source
creating just a single subscription to it. This replaces the older Publish().RefCount()
technique that achieved the same end, effectively giving us a "hot" version of the source stream.
In this case, this is necessary to ensure the subsequent buffer closing streams produced after the first will start with the current events - if the source was cold they would start over each time. I wrote a bit about publishing here.
The main query
We use an overload of Buffer
that accepts a factory function that is called for every buffer emitted to obtain an observable stream whose first event is a signal to terminate the current buffer.
In this case, we want to terminate the buffer when either the first event into the buffer has been there for a full second, or when 20 events have appeared from the source - whichever comes first.
To achieve this we Merge
streams that describe each case - the Take(1).Delay(...)
combo describes the first condition, and the Skip(19).Take(1)
describes the second.
However, I would still test performance the easy way, because I still suspect this is overkill, but a lot depends on the precise details of the platform and scenario etc.
After using the accepted answer for quite a while I would now suggest a different implementation (inspired by James Skip / Take approach and this answer):
var source = Observable.Interval(TimeSpan.FromSeconds(3))
.Select(n => Observable.Repeat(n, 50))
.Merge();
var xs = source.BufferOmitEmpty(TimeSpan.FromSeconds(1), 20);
xs.Subscribe(x => Console.WriteLine(x.Count));
With an extension method BufferOmitEmpty
like:
public static IObservable<IList<TSource>> BufferOmitEmpty<TSource>(this IObservable<TSource> observable, TimeSpan maxDelay, int maxBufferCount)
{
return observable
.GroupByUntil(x => 1, g => Observable.Timer(maxDelay).Merge(g.Skip(maxBufferCount - 1).Take(1).Select(x => 1L)))
.Select(x => x.ToArray())
.Switch();
}
It is 'lazy', because no groups are created as long as there are no elements on the source sequence, so there are no empty buffers. As in Toms answer there is an other nice advantage to the Buffer / Where implementation, that is the buffer is started when the first element arrives. So elements following each other within buffer time after a quiet period are processed in the same buffer.
Why not to use the Buffer method
Three problems occured when I was using the Buffer approach (they might be irrelevant for the scope of the question, so this is a warning to people who use stack overflow answers in different contexts like me):
Delay
one thread is used per subscriber. (I can supply sample code for 2. and 3. but I'm insecure whether to post it here or in a different question because I cannot fully explain why it behaves this way)
RxJs5 has hidden features buried into their source code. It turns out it's pretty easy to achieve with bufferTime
From the source code, the signature looks like this:
export function bufferTime<T>(this: Observable<T>, bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: IScheduler): Observable<T[]>;
So your code would be like this:
observable.bufferTime(1000, null, 20)
链接地址: http://www.djcxy.com/p/67314.html
上一篇: 缓冲操作员计数和时间条件