Handling a stream of events based on timing in rxjs

I have a process that sends me packets of data at intervals, and I need to manage that stream based on the timing of when the packets arrive and so on. At some point I also close the stream and the process.

Right now, I'm using a set of timers to do this, but I hope I can do it with rxjs since it seems a very good fit for this kind of thing. So far, I haven't had much success.

The problem

The stream is supposed to send me packets at regular intervals, but it usually deviates a lot and sometimes gets stuck.

I want to close the stream at some point, under the following conditions:

  • If it takes more than startDelay to send me the first packet.
  • After the first packet is sent, if there is a pause of more than middleDelay between two packets.
  • After a constant time period maxChannelTime .
  • When I'm about to close the stream due to any of the above reasons, I first request it to close politely so it can do some cleanup. Sometimes it also sends me a final data packet during the cleanup. But I want to wait no longer than cleanupTime for the cleanup and last data to arrive before I close the stream and ignore any more messages.

    Elaboration

    I'll create the "streams" by wrapping an event with an Observable. I have no trouble doing that.

    By "closing" a stream, I mean telling the process to stop emitting data, and possibly to close (ie die).


    Tricky problem.

    I've broken it down to two phases - 'regulated' (since we want to check for regular intervals), and 'cleanup'.

    Working backwards, the output is

    const regulated = source.takeUntil(close)
    const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
    const output = regulated.merge(cleanup)
    

    'Closers' are observables that emit when it's time to close (one closer per timeout value).

    const startTimeout = 600
    const intervalTimeout = 200
    const maxtimeTimeout = 3000
    const cleanupTimeout = 300
    
    const startCloser = Observable.timer(startTimeout)  // emit once after initial delay
      .takeUntil(source)                                // cancel after source emits
      .mapTo('startTimeoutMarker')
    
    const intervalCloser = source.switchMap(x =>    // reset interval after each source emit
        Observable.timer(intervalTimeout)           // emit once after intervalTimeout
          .mapTo('intervalTimeoutMarker')
      )
    
    const maxtimeCloser = Observable.timer(maxtimeTimeout)  // emit once after maxtime
      .takeUntil(startCloser)                               // cancel if startTimeout
      .takeUntil(intervalCloser)                            // cancel if intervalTimeout
      .mapTo('maxtimeTimeoutMarker')
    
    const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
    
    const cleanupCloser = close.switchMap(x =>      // start when close emits
         Observable.timer(cleanupTimeout)           // emit once after cleanup time
      ) 
      .mapTo('cleanupTimeoutMarker')
    

    Here's a working sample CodePen (please run tests one at a time)


    It's hard to give any advice without knowing how you create the "streams" with RxJS or how you want to use them later.

    In general you could achieve what you want with just takeUntil() , switchMap() and timeout() .

    Observable.defer(...)
      .startWith(undefined) // Trigger the first `timeout`
      .switchMap((val, i) => {
        if (i === 0) { // waiting for the first value
          return Observable.of().timeout(startDelay);
        } else {
          return Observable.of(val).timeout(middleDelay);
        }
      })
      .takeUntil(Observable.timer(maxChannelTime));
    

    I don't know what you mean by "close the stream at some point". Do you expect the error or complete notification? This solution will emit error when timeout expires and complete if takeUntil emits.


    In the end, this is what I did. My answer is mainly based on Richard Matsen's answer, so I'm leaving his answer as accepted.

    There turned out to be a few additional changes I needed to make.

    This code is the code that takes in a stream of data messages and returns a singleton observable containing all the data gathered and the reason for termination.

    let startCloser$ = Observable.timer(this.options.maxStartDelay).takeUntil(dataStream$).mapTo(TerminationReason.StartTimeout);
    
    let intervalCloser$ = dataStream$.switchMap(x => Observable.timer(this.options.timeBetweenPackets).mapTo(TerminationReason.Inactivity));
    
    let maxTimeCloser$ = Observable.timer(this.options.totalConnectionTime).takeUntil(startCloser$).takeUntil(intervalCloser$).mapTo(TerminationReason.ChannelTimeout);
    
    //we need to publishReplay it so we can get the reason afterwards...
    let close$ = startCloser$.merge(intervalCloser$, maxTimeCloser$).take(1).publishReplay(1);
    //basically treating close$ like a promise
    close$.connect();
    
    //cleanupAction has side-effects so it must only be subscribed to once.
    let cleanupAction$ = Observable.defer(async () => {
        //it's just a promise that yields nothing and waits until requestTermination has terminated
        //requestTermination is an async function and it already has a timeout thing in promise-language
        await this.requestTermination();
    });
    
    let result$ = dataStream$.takeUntil(close$).concat(dataStream$.takeUntil(cleanupAction$)).toArray().switchMap(arrs => {
        //switchMap will only resolve once because the observable is a singleton
    
        return close$.map(reason => {
            //this should fire immediately because close is publishReplay(1) and has already happened
            let totalArr = _.flattenDeep(arrs);
            return {
                reason : reason,
                data : totalArr
            }
        })
    });
    
    return result$;
    
    链接地址: http://www.djcxy.com/p/40722.html

    上一篇: 无法推断超类

    下一篇: 根据rxjs中的时间处理事件流