Reactive Extensions: buffer until subscriber is idle

I have a program where I'm receiving events and want to process them in batches, so that all items that come in while I'm processing the current batch will appear in the next batch.

The simple TimeSpan and count based Buffer methods in Rx will give me multiple batches of items instead of giving me one big batch of everything that has come in (in cases when the subscriber takes longer than the specified TimeSpan or more than N items come in and N is greater than count).

I looked at using the more complex Buffer overloads that take Func<IObservable<TBufferClosing>> or IObservable<TBufferOpening> and Func<TBufferOpening, IObservable<TBufferClosing>>, but I can't find examples of how to use these, much less figure out how to apply them to what I'm trying to do.


Does this do what you want?

var xs = new Subject<int>();
var ys = new Subject<Unit>();

var zss =
    xs.Buffer(ys);

zss
    .ObserveOn(Scheduler.Default)
    .Subscribe(zs =>
    {
        Thread.Sleep(1000);
        Console.WriteLine(String.Join("-", zs));
        ys.OnNext(Unit.Default);
    });

ys.OnNext(Unit.Default);
xs.OnNext(1);
Thread.Sleep(200);
xs.OnNext(2);
Thread.Sleep(600);
xs.OnNext(3);
Thread.Sleep(400);
xs.OnNext(4);
Thread.Sleep(300);
xs.OnNext(5);
Thread.Sleep(900);
xs.OnNext(6);
Thread.Sleep(100);
xs.OnNext(7);
Thread.Sleep(1000);

My Result:

1-2-3
4-5
6-7

What you need is something to buffer the values and then when the worker is ready it asks for the current buffer and then resets it. This can be done with a combination of RX and Task

class TicTac<Stuff> {

    private TaskCompletionSource<List<Stuff>> Items = new TaskCompletionSource<List<Stuff>>();

    List<Stuff> in = new List<Stuff>();

    public void push(Stuff stuff){
        lock(this){
            if(in == null){
                in = new List<Stuff>();
                Items.SetResult(in);
            }
            in.Add(stuff);
        }
    }

    private void reset(){
        lock(this){
            Items = new TaskCompletionSource<List<Stuff>>();
            in = null;
        }
    }

    public async Task<List<Stuff>> Items(){
        List<Stuff> list = await Items.Task;
        reset();
        return list;
    }
}

then

var tictac = new TicTac<double>();

IObservable<double> source = ....

source.Subscribe(x=>tictac.Push(x));

Then in your worker

while(true){

    var items = await tictac.Items();

    Thread.Sleep(100);

    for each (item in items){
        Console.WriteLine(item);
    }

}

The way I have done this before is to pull up the ObserveOn method in DotPeek/Reflector and take that queuing concept that it has and adapt it to our requirements. For example, in UI applications with fast ticking data (like finance) the UI thread can get flooded with events and sometimes it cant update quick enough. In these cases we want to drop all events except the last one (for a particular instrument). In this case we changed the internal Queue of the ObserveOn to a single value of T (look for ObserveLatestOn(IScheduler)). In your case you want the Queue, however you want to push the whole queue not just the first value. This should get you started.

链接地址: http://www.djcxy.com/p/11532.html

上一篇: 在C ++中表达一般的monadic接口(如Monad类)

下一篇: 反应式扩展:缓冲直到用户闲置