反应式扩展:缓冲直到用户闲置

我有一个程序,我正在接收事件并希望分批处理它们,以便在处理当前批次时进入的所有项目都将显示在下一批次中。

Rx中简单的TimeSpan和基于计数的Buffer方法会给我多批次的物品,而不是给我一大堆所有进来的物品(如果订户的时间超过了指定的TimeSpan或超过N个物品进入, N大于计数)。

我研究了使用Func <IObservable <TBufferClosing >>或IObservable <TBufferOpening>和Func <TBufferOpening,IObservable <TBufferClosing >>的更复杂的Buffer重载,但我找不到如何使用这些的示例,更少的数字了解如何将它们应用于我正在尝试做的事情。


这是否做你想要的?

var xs = new Subject<int>();
var ys = new Subject<Unit>();

var zss =
    xs.Buffer(ys);

zss
    .ObserveOn(Scheduler.Default)
    .Subscribe(zs =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(String.Join("-", zs));
        ys.OnNext(Unit.Default);
    });

ys.OnNext(Unit.Default);
xs.OnNext(1);
Thread.Sleep(200);
xs.OnNext(2);
Thread.Sleep(600);
xs.OnNext(3);
Thread.Sleep(400);
xs.OnNext(4);
Thread.Sleep(300);
xs.OnNext(5);
Thread.Sleep(900);
xs.OnNext(6);
Thread.Sleep(100);
xs.OnNext(7);
Thread.Sleep(1000);

我的结果:

1-2-3
4-5
6-7

你需要什么来缓冲这些值,然后当工作人员准备就绪时,它会询问当前的缓冲区,然后重置它。 这可以通过RX和任务的组合来完成

class TicTac<Stuff> {

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>();

    List<Stuff> in = new List<Stuff>();

    public void push(Stuff stuff){
        lock(this){
            if(in == null){
                in = new List<Stuff>();
                Items.SetResult(in);
            }
            in.Add(stuff);
        }
    }

    private void reset(){
        lock(this){
            Items = new TaskCompletionSource<List<Stuff>>();
            in = null;
        }
    }

    public async Task<List<Stuff>> Items(){
        List<Stuff> list = await Items.Task;
        reset();
        return list;
    }
}

然后

var tictac = new TicTac<double>();

IObservable<double> source = ....

source.Subscribe(x=>tictac.Push(x));

然后在你的工人

while(true){

    var items = await tictac.Items();

    Thread.Sleep(100);

    for each (item in items){
        Console.WriteLine(item);
    }

}

我之前完成这项工作的方式是在DotPeek / Reflector中调用ObserveOn方法,并采用它具有的排队概念并使其适应我们的要求。 例如,在具有快速滴答数据的UI应用程序(如财务)中,UI线程可能会充满事件,有时它不能足够快速地更新。 在这些情况下,我们希望删除除最后一个以外的所有事件(针对特定仪器)。 在这种情况下,我们将ObserveOn的内部队列更改为T的单个值(查找ObserveLatestOn(IScheduler))。 在你的情况下,你想要队列,但是你想推动整个队列不只是第一个值。 这应该让你开始。

链接地址: http://www.djcxy.com/p/11531.html

上一篇: Reactive Extensions: buffer until subscriber is idle

下一篇: Morphological separation of two connected boundaries