Why does Rx buffer continuously perform method when buffer contains no items?

I have a Rx Observable that acts as a buffer. Right now it performs the method in Subscribe either when it gets 10 items, or after 100 milliseconds, whichever comes first.

I noticed that my method is continuously being called every 100 ms, even when there are no items in the buffer, which surprised me. It's simple enough to just make my method return immediately if it receives no items from the buffer, but I thought it was weird that it's just churning away in the background like that.

Why is this? How do you recommend I best deal with this? I am a complete newbie to Rx, so maybe I'm doing something weird. Here's a simplified version of my code:

private Subject<KeyValuePair<int, Action<MyData>>> serverRequests;

public MyBufferClass(IMyServer server, IScheduler scheduler)
{
    this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();

    this.serverRequests
        .Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler)
        .Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}   

public void GetSingleItemFromServer(int id, Action<MyData> callback)
{
    this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback));
}

public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks)
{
    if (idsWithCallbacks.IsNullOrEmpty()) return;

    this.server.GetMultipleItems(idsWithCallbacks)
}

In my tests, if I call GetSingleItemFromServer 5 times and then advance my TestScheduler by 1000 ms, I thought GetMultipleItemsFromServer would only be called once, but it gets called 10 times.


In situations like this an elegant solution can be to use the Where operator straight after the Buffer to filter out any empty results. Something like this:

            stream
            .Buffer (...)
            .Where (x => x.Any())
            .Subscribe (x => {...}, ex => {...});

As to why Buffer acts like this, I suppose it's better to surface an empty collection and allow the consumer to choose what to do with it, than to swallow it and deny that opportunity.

On a separate note, I wouldn't have your server call within the subscribe block. I think it's a better idea to have any asynchronous operations as a part of the Rx stream composition itself, and to restrict the Subscribe action to any lightweight operations that deal with the final result, ie updating the UI, logging success/failure etc. Something like this:

(from request in serverRequests
            .Buffer (TimeSpan.FromMinutes (1))
            .Where (x => x.Any())
from response in Observable.Start(server.GetMultipleItems(...))
select response)
.Subscribe (x => {}, ex => {});

Advantages to this include:

-Being able to use further Rx operators on your server call, such as Timeout(), Retry(), Catch(), etc.

-Being able to handle any pipeline errors within the Subscribe() overload

-Independent scheduling of the pipeline and the Subscribe action with SubscribeOn()/ObserveOn().


Perhaps try it like this:

public MyBufferClass(IMyServer server, IScheduler scheduler)
{
    this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();

    this.serverRequests
        .GroupByUntil(x => 1, x => Observable.Timer(TimeSpan.FromMilliseconds(1000)))
        .SelectMany(x => x.ToArray())
        .Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}  

That doesn't give you empty results.

And the answer to your question regarding .Buffer(...) - that's the way it has been designed. Nothing more complicated than that.

链接地址: http://www.djcxy.com/p/67320.html

上一篇: C ++ monad库

下一篇: 为什么当缓冲区不包含项目时,Rx缓冲区会连续执行方法?