如何在RX中使用超时实现缓冲

我需要实现一个事件处理,当没有新的事件到达某段时间时会延迟处理。 (当文本缓冲区发生变化时,我必须排队解析任务,但我不想在用户仍在键入时开始解析。)

我是RX新手,但据我所知,我需要结合使用BufferWithTime和Timeout方法。 我想这是这样工作的:它缓冲事件,直到它们在后续事件之间的特定时间段内定期接收。 如果事件流中存在间隙(比时间跨度长),它应该返回传播缓冲的事件。

看看如何实现Buffer和Timeout,我可以实现我的BufferWithTimeout方法(如果每个人都有一个,请与我分享),但我想知道是否可以通过结合现有方法来实现。 有任何想法吗?


我认为BufferWithTime就是你所追求的。

没有内置任何东西,但是像这样的东西应该可以工作:

注意:如果源发生错误,缓冲区不会刷新。 这与BufferWith*的当前(或当前最后一次检查)功能相BufferWith*

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout)
{
    return source.BufferWithTimeout(timeout, Scheduler.TaskPool);
}

public static IObservable<TSource[]> BufferWithTimeout<TSource>(
    this IObservable<TSource> source, TimeSpan timeout, IScheduler scheduler)
{
    return Observable.CreateWithDisposable<TSource[]>(observer =>
    {
        object lockObject = new object();
        List<TSource> buffer = new List<TSource>();

        MutableDisposable timeoutDisposable = new MutableDisposable();

        Action flushBuffer = () =>
        {
            TSource[] values;

            lock(lockObject)
            {
                values = buffer.ToArray();
                buffer.Clear();
            }

            observer.OnNext(values);
        };

        var sourceSubscription = source.Subscribe(
            value =>
            {
                lock(lockObject)
                {
                    buffer.Add(value);
                }

                timeoutDisposable.Disposable = 
                    scheduler.Schedule(flushBuffer, timeout);
            },
            observer.OnError,
            () =>
            {
                flushBuffer();
                observer.OnCompleted();
            });

        return new CompositeDisposable(sourceSubscription, timeoutDisposable);
    });
}

这是一个相当古老的问题,但我确实相信以下答案值得一提,因为所有其他解决方案都迫使用户手动订阅,追踪更改等。

我提供以下作为“Rx-y”解决方案。

var buffers = source
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb<int>(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(9)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());

基本上,源代码被窗口化,直到用最新的窗口来定义一些可观察的。 创建一个新窗口(分组可观察),我们使用该窗口来确定窗口何时应该关闭。 在这种情况下,我会在5秒钟不活动或最大长度为10(9 + 1)后关闭窗口。


除了Richard Szalay的回答之外,我刚刚从最新的rx版本中看到了新的Window操作符。 它有点'解决了你的问题,因为你可以'超时缓冲',也就是在一个持续到达到超时的时间窗口内获得输出,而不是像IEnumerable那样接收结果作为IObservable。

下面是我的意思的一个简单例子:

private void SetupStream()
{
    var inputStream = Observable.FromEvent<MouseButtonEventHandler, MouseButtonEventArgs>(
        h => new MouseButtonEventHandler(h), 
        h => MouseDown += h,
        h => MouseDown -= h);

    var timeout = inputStream.Select(evt => Observable.Timer(TimeSpan.FromSeconds(10), Scheduler.Dispatcher))
        .Switch();

    inputStream.Window(() => timeout)
        .Subscribe(OnWindowOpen);
}


private void OnWindowOpen(IObservable<IEvent<MouseButtonEventArgs>> window)
{
    Trace.WriteLine(string.Format("Window open"));

    var buffer = new List<IEvent<MouseButtonEventArgs>>();

    window.Subscribe(click =>
    {

        Trace.WriteLine(string.Format("Click"));

        buffer.Add(click);

    }, () => ProcessEvents(buffer));
}

private void ProcessEvents(IEnumerable<IEvent<MouseButtonEventArgs>> clicks)
{
    Trace.WriteLine(string.Format("Window closed"));

    //...
}

每当窗口打开时,您会收到所有事件,并将它们存储在缓冲区中,并在窗口完成时处理(实际发生在下一个窗口打开时)。

不知道理查德是否会改变他的例子使用Window现在可用,但认为它可能是值得提出作为替代。

链接地址: http://www.djcxy.com/p/67305.html

上一篇: How to implement buffering with timeout in RX

下一篇: Good introduction to the .NET Reactive Framework