Observable.Buffer可以在时间以外的其他地方使用吗?
我一直在寻找关于如何在rx中使用Observable.Buffer的例子,但是找不到比锅炉时间缓冲的东西更实际的东西。
似乎有一个重载指定一个“bufferClosingSelector”,但我无法围绕它思考。
我想要做的是创建一个按时间或“积累”缓冲的序列。 考虑一个请求流,其中每个请求都有一定的权重,而且我不想一次处理超过x累积的权重,或者如果没有足够的累积权限,只需给我上一个时间框架中的内容即可(常规缓冲区功能)
bufferClosingSelector
是一个每次调用的函数来获得一个Observable,当缓冲区被期望关闭时它将产生一个值。
例如,
source.Buffer(() => Observable.Timer(TimeSpan.FromSeconds(1)))
像常规的Buffer(time)
重载一样工作。
在你想加权的序列中,你可以对序列进行Scan
,然后决定你的聚合条件。
例如, source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
为您提供了一个序列,它在源序列source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
超过100个时生成一个值。
您可以使用Amb
来比赛这两个结束条件,以查看哪个反应第一:
.Buffer(() => Observable.Amb
(
Observable.Timer(TimeSpan.FromSeconds(1)),
source.Scan((a,c) => a + c).SkipWhile(a => a < 100)
)
)
您可以使用任何系列的组合器,这些组合器在此时为缓冲区生成任何值。
注意:给闭幕选择器的值并不重要 - 这是重要的通知。 因此,将不同类型的源与Amb
简单组合,只需将其更改为System.Reactive.Unit
。
Observable.Amb(stream1.Select(_ => new Unit()), stream2.Select(_ => new Unit())
链接地址: http://www.djcxy.com/p/67309.html
上一篇: Is it possible to Observable.Buffer on something other than time
下一篇: C#, BlockingCollection: How to wait until collection has less than N items