Observable.Buffer可以在时间以外的其他地方使用吗?

我一直在寻找关于如何在rx中使用Observable.Buffer的例子,但是找不到比锅炉时间缓冲的东西更实际的东西。

似乎有一个重载指定一个“bufferClosingSelector”,但我无法围绕它思考。

我想要做的是创建一个按时间或“积累”缓冲的序列。 考虑一个请求流,其中每个请求都有一定的权重,而且我不想一次处理超过x累积的权重,或者如果没有足够的累积权限,只需给我上一个时间框架中的内容即可(常规缓冲区功能)


bufferClosingSelector是一个每次调用的函数来获得一个Observable,当缓冲区被期望关闭时它将产生一个值。

例如,

source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))像常规的Buffer(time)重载一样工作。

在你想加权的序列中,你可以对序列进行Scan ,然后决定你的聚合条件。

例如, source.Scan((a,c) => a + c).SkipWhile(a => a < 100)为您提供了一个序列,它在源序列source.Scan((a,c) => a + c).SkipWhile(a => a < 100)超过100个时生成一个值。

您可以使用Amb来比赛这两个结束条件,以查看哪个反应第一:

        .Buffer(() => Observable.Amb
                     (
                          Observable.Timer(TimeSpan.FromSeconds(1)), 
                          source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
                     )
               )

您可以使用任何系列的组合器,这些组合器在此时为缓冲区生成任何值。

注意:给闭幕选择器的值并不重要 - 这是重要的通知。 因此,将不同类型的源与Amb简单组合,只需将其更改为System.Reactive.Unit

Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
链接地址: http://www.djcxy.com/p/67309.html

上一篇: Is it possible to Observable.Buffer on something other than time

下一篇: C#, BlockingCollection: How to wait until collection has less than N items