反应式扩展:缓冲直到用户闲置
我有一个程序,我正在接收事件并希望分批处理它们,以便在处理当前批次时进入的所有项目都将显示在下一批次中。
Rx中简单的TimeSpan和基于计数的Buffer方法会给我多批次的物品,而不是给我一大堆所有进来的物品(如果订户的时间超过了指定的TimeSpan或超过N个物品进入, N大于计数)。
我研究了使用Func <IObservable <TBufferClosing >>或IObservable <TBufferOpening>和Func <TBufferOpening,IObservable <TBufferClosing >>的更复杂的Buffer重载,但我找不到如何使用这些的示例,更少的数字了解如何将它们应用于我正在尝试做的事情。
这是否做你想要的?
var xs = new Subject<int>();
var ys = new Subject<Unit>();
var zss =
xs.Buffer(ys);
zss
.ObserveOn(Scheduler.Default)
.Subscribe(zs =>
{
Thread.Sleep(1000);
Console.WriteLine(String.Join("-", zs));
ys.OnNext(Unit.Default);
});
ys.OnNext(Unit.Default);
xs.OnNext(1);
Thread.Sleep(200);
xs.OnNext(2);
Thread.Sleep(600);
xs.OnNext(3);
Thread.Sleep(400);
xs.OnNext(4);
Thread.Sleep(300);
xs.OnNext(5);
Thread.Sleep(900);
xs.OnNext(6);
Thread.Sleep(100);
xs.OnNext(7);
Thread.Sleep(1000);
我的结果:
1-2-3
4-5
6-7
你需要什么来缓冲这些值,然后当工作人员准备就绪时,它会询问当前的缓冲区,然后重置它。 这可以通过RX和任务的组合来完成
class TicTac<Stuff> {
private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>();
List<Stuff> in = new List<Stuff>();
public void push(Stuff stuff){
lock(this){
if(in == null){
in = new List<Stuff>();
Items.SetResult(in);
}
in.Add(stuff);
}
}
private void reset(){
lock(this){
Items = new TaskCompletionSource<List<Stuff>>();
in = null;
}
}
public async Task<List<Stuff>> Items(){
List<Stuff> list = await Items.Task;
reset();
return list;
}
}
然后
var tictac = new TicTac<double>();
IObservable<double> source = ....
source.Subscribe(x=>tictac.Push(x));
然后在你的工人
while(true){
var items = await tictac.Items();
Thread.Sleep(100);
for each (item in items){
Console.WriteLine(item);
}
}
我之前完成这项工作的方式是在DotPeek / Reflector中调用ObserveOn方法,并采用它具有的排队概念并使其适应我们的要求。 例如,在具有快速滴答数据的UI应用程序(如财务)中,UI线程可能会充满事件,有时它不能足够快速地更新。 在这些情况下,我们希望删除除最后一个以外的所有事件(针对特定仪器)。 在这种情况下,我们将ObserveOn的内部队列更改为T的单个值(查找ObserveLatestOn(IScheduler))。 在你的情况下,你想要队列,但是你想推动整个队列不只是第一个值。 这应该让你开始。
链接地址: http://www.djcxy.com/p/11531.html