如何根据时间和计数进行缓冲,但如果没有事件发生,则停止定时器

我正在生成每个树秒50个项目的序列。 然后我想在最多20个物品的情况下对它们进行批量处理,但在释放缓冲区之前也不会等待超过一秒。

这很好!

但由于时间间隔永远不会消失,缓冲区不断发射空批量块......

我怎样才能避免这种情况? 舒尔Where(buf => buf.Count > 0)应该有所帮助 - 但是这看起来像一个黑客。

Observable
    .Interval(TimeSpan.FromSeconds(3))
    .Select(n => Observable.Repeat(n, 50))
    .Merge()
    .Buffer(TimeSpan.FromSeconds(1), 20)
    .Subscribe(e => Console.WriteLine(e.Count));

输出:

0-0-0-20-20-10-0-20-20-10-0-0-20-20

你提出的Where过滤器是一个很好的方法,我会去做。

您可以将BufferWhere包装到一个名为使目标更清晰的单个帮助程序方法中,但请放心, Where子句是这种情况下的惯用Rx。

这样想一想; 一个空的缓冲区正在传递最后一秒未发生事件的信息。 虽然您可以争辩说这是隐含的,但如果Buffer没有发出空列表,则需要额外的工作才能检测到这一点。 它只是发生,它不是你感兴趣的信息 - 所以Where适当的方式来过滤这些信息。

懒惰的计时器解决方案

从您的评论(“...计时器...正在懒惰地发起......”)开始,您可以执行此操作来创建延迟计时器并省略零计数:

var source = Observable.Interval(TimeSpan.FromSeconds(3))
                    .Select(n => Observable.Repeat(n, 50))
                    .Merge();

var xs = source.Publish(pub =>
    pub.Buffer(() => pub.Take(1).Delay(TimeSpan.FromSeconds(1))
                        .Merge(pub.Skip(19)).Take(1)));

xs.Subscribe(x => Console.WriteLine(x.Count));

说明

出版

此查询需要多次订阅源事件。 为了避免意想不到的副作用,我们使用Publishpub提供pub ,它是一个多播source的流,只创建一个订阅。 这取代了实现相同目标的旧版Publish().RefCount()技术,有效地为我们提供了源流的“热”版本。

在这种情况下,这对于确保在第一个缓冲区关闭流将在当前事件开始之后产生是必需的 - 如果源很冷,它们将每次都重新开始。 我在这里写了一些关于发布的内容。

主查询

我们使用Buffer的重载,它接受一个工厂函数,该函数被调用来发送每个缓冲区以获得可观察的流,其第一个事件是终止当前缓冲区的信号。

在这种情况下,当第一个进入缓冲区的事件已经停留了整整一秒,或者当源出现了20个事件 - 以先到者为准时,我们希望终止缓冲区。

为了实现这一点,我们Merge了描述每个案例的流 - Take(1).Delay(...)组合描述了第一个条件, Skip(19).Take(1)描述了第二个条件。

但是,我仍然会以简单的方式测试性能,因为我仍然怀疑这是过度的,但很大程度上取决于平台和场景的确切细节等。


在使用接受的答案很长一段时间之后,我现在会提出一个不同的实现(受James Skip / Take方法和这个答案的启发):

var source = Observable.Interval(TimeSpan.FromSeconds(3))
    .Select(n => Observable.Repeat(n, 50))
    .Merge();

var xs = source.BufferOmitEmpty(TimeSpan.FromSeconds(1), 20);

xs.Subscribe(x => Console.WriteLine(x.Count));

用一个扩展方法BufferOmitEmpty就像:

public static IObservable<IList<TSource>> BufferOmitEmpty<TSource>(this IObservable<TSource> observable, TimeSpan maxDelay, int maxBufferCount)
{
    return observable
        .GroupByUntil(x => 1, g => Observable.Timer(maxDelay).Merge(g.Skip(maxBufferCount - 1).Take(1).Select(x => 1L)))
        .Select(x => x.ToArray())
        .Switch();
}  

这是'懒惰',因为只要源序列中没有元素就不会创建组,因此没有空缓冲区。 正如汤姆斯的回答,Buffer / Where实现还有一个很好的优势,那就是当第一个元素到达时缓冲区就会启动。 因此,在相同的缓冲区中处理在静默期之后的缓冲时间内彼此跟随的元素。

为什么不使用Buffer方法

当我使用Buffer方法时发生了三个问题(它们可能与问题的范围无关,所以这是对在我的不同环境中使用堆栈溢出答案的人的警告):

  • 由于Delay每个用户使用一个线程。
  • 在运行时间较长的情况下,源序列中的元素可能会丢失。
  • 有了多个订阅者,它有时会创建大于maxBufferCount的缓冲区。
  • (我可以提供2和3的示例代码,但是我不确定是将它发布在这里还是另一个问题中,因为我无法完全解释为什么它以这种方式运行)


    RxJs5具有隐藏在源代码中的隐藏功能。 事实证明,使用bufferTime可以非常容易地实现

    从源代码中,签名看起来像这样:

    export function bufferTime<T>(this: Observable<T>, bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: IScheduler): Observable<T[]>;
    

    所以你的代码会是这样的:

    observable.bufferTime(1000, null, 20)
    
    链接地址: http://www.djcxy.com/p/67313.html

    上一篇: How to buffer based on time and count, but stopping the timer if no events occur

    下一篇: Reactive Extensions: Process events in batches + add delay between every batch