为什么当缓冲区不包含项目时,Rx缓冲区会连续执行方法?

我有一个Rx Observable作为缓冲区。 现在它在订阅时执行方法,当它获取10个项目或100毫秒后,以先到者为准。

我注意到,即使缓冲区中没有项目,我的方法也会每隔100 ms连续调用一次,这让我感到惊讶。 如果我的方法从缓冲区中没有收到任何项目,它就会立即返回,但我认为这很奇怪,它只是在这样的背景中搅动。

为什么是这样? 你如何建议我最好处​​理这个问题? 我是Rx的新手,所以也许我做了一些奇怪的事情。 以下是我的代码的简化版本:

private Subject<KeyValuePair<int, Action<MyData>>> serverRequests;

public MyBufferClass(IMyServer server, IScheduler scheduler)
{
    this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();

    this.serverRequests
        .Buffer(TimeSpan.FromMilliseconds(100), 10, scheduler)
        .Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}   

public void GetSingleItemFromServer(int id, Action<MyData> callback)
{
    this.serverRequests.OnNext(new KeyValuePair<int, Action<MyData>>(id, callback));
}

public void GetMultipleItemsFromServer(IEnumerable<KeyValuePair<int, Action<MyData>>> idsWithCallbacks)
{
    if (idsWithCallbacks.IsNullOrEmpty()) return;

    this.server.GetMultipleItems(idsWithCallbacks)
}

在我的测试中,如果我调用GetSingleItemFromServer 5次,然后将我的TestScheduler提前1000 ms,我认为GetMultipleItemsFromServer只会被调用一次,但会被调用10次。


在这种情况下,优雅的解决方案可以是直接在缓冲区之后使用Where操作符来过滤掉任何空的结果。 像这样的东西:

            stream
            .Buffer (...)
            .Where (x => x.Any())
            .Subscribe (x => {...}, ex => {...});

至于为什么缓冲区这样做,我想最好是展示一个空集合,让消费者选择如何处理它,而不是吞下它并否认这个机会。

在另一个笔记中,我不会在订阅模块中调用您的服务器。 我认为将任何异步操作作为Rx流组合本身的一部分并将Subscribe操作限制为处理最终结果的任何轻量级操作(即更新UI,记录成功/失败等)是更好的主意。喜欢这个:

(from request in serverRequests
            .Buffer (TimeSpan.FromMinutes (1))
            .Where (x => x.Any())
from response in Observable.Start(server.GetMultipleItems(...))
select response)
.Subscribe (x => {}, ex => {});

其优点包括:

- 能够在服务器调用中使用更多的Rx操作符,例如Timeout(),Retry(),Catch()等。

- 能够处理Subscribe()过载中的任何管道错误

- 使用SubscribeOn()/ ObserveOn()独立调度管道和Subscribe操作。


也许尝试这样:

public MyBufferClass(IMyServer server, IScheduler scheduler)
{
    this.serverRequests = new Subject<KeyValuePair<int, Action<MyData>>>();

    this.serverRequests
        .GroupByUntil(x => 1, x => Observable.Timer(TimeSpan.FromMilliseconds(1000)))
        .SelectMany(x => x.ToArray())
        .Subscribe(buffer => GetMultipleItemsFromServer(buffer));
}  

这不会给你空的结果。

对你的问题的答案.Buffer(...) - 这是它被设计的方式。 没有比这更复杂的了。

链接地址: http://www.djcxy.com/p/67319.html

上一篇: Why does Rx buffer continuously perform method when buffer contains no items?

下一篇: Reactive Extensions bug on Windows Phone