Handling a stream of events based on timing in rxjs
I have a process that sends me packets of data at intervals, and I need to manage that stream based on the timing of when the packets arrive and so on. At some point I also close the stream and the process.
Right now, I'm using a set of timers to do this, but I hope I can do it with rxjs
since it seems a very good fit for this kind of thing. So far, I haven't had much success.
The problem
The stream is supposed to send me packets at regular intervals, but it usually deviates a lot and sometimes gets stuck.
I want to close the stream at some point, under the following conditions:
startDelay
to send me the first packet. middleDelay
between two packets. maxChannelTime
. When I'm about to close the stream due to any of the above reasons, I first request it to close politely so it can do some cleanup. Sometimes it also sends me a final data packet during the cleanup. But I want to wait no longer than cleanupTime
for the cleanup and last data to arrive before I close the stream and ignore any more messages.
Elaboration
I'll create the "streams" by wrapping an event with an Observable. I have no trouble doing that.
By "closing" a stream, I mean telling the process to stop emitting data, and possibly to close (ie die).
Tricky problem.
I've broken it down to two phases - 'regulated' (since we want to check for regular intervals), and 'cleanup'.
Working backwards, the output is
const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)
'Closers' are observables that emit when it's time to close (one closer per timeout value).
const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300
const startCloser = Observable.timer(startTimeout) // emit once after initial delay
.takeUntil(source) // cancel after source emits
.mapTo('startTimeoutMarker')
const intervalCloser = source.switchMap(x => // reset interval after each source emit
Observable.timer(intervalTimeout) // emit once after intervalTimeout
.mapTo('intervalTimeoutMarker')
)
const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime
.takeUntil(startCloser) // cancel if startTimeout
.takeUntil(intervalCloser) // cancel if intervalTimeout
.mapTo('maxtimeTimeoutMarker')
const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
const cleanupCloser = close.switchMap(x => // start when close emits
Observable.timer(cleanupTimeout) // emit once after cleanup time
)
.mapTo('cleanupTimeoutMarker')
Here's a working sample CodePen (please run tests one at a time)
It's hard to give any advice without knowing how you create the "streams" with RxJS or how you want to use them later.
In general you could achieve what you want with just takeUntil()
, switchMap()
and timeout()
.
Observable.defer(...)
.startWith(undefined) // Trigger the first `timeout`
.switchMap((val, i) => {
if (i === 0) { // waiting for the first value
return Observable.of().timeout(startDelay);
} else {
return Observable.of(val).timeout(middleDelay);
}
})
.takeUntil(Observable.timer(maxChannelTime));
I don't know what you mean by "close the stream at some point". Do you expect the error
or complete
notification? This solution will emit error
when timeout expires and complete
if takeUntil
emits.
In the end, this is what I did. My answer is mainly based on Richard Matsen's answer, so I'm leaving his answer as accepted.
There turned out to be a few additional changes I needed to make.
This code is the code that takes in a stream of data messages and returns a singleton observable containing all the data gathered and the reason for termination.
let startCloser$ = Observable.timer(this.options.maxStartDelay).takeUntil(dataStream$).mapTo(TerminationReason.StartTimeout);
let intervalCloser$ = dataStream$.switchMap(x => Observable.timer(this.options.timeBetweenPackets).mapTo(TerminationReason.Inactivity));
let maxTimeCloser$ = Observable.timer(this.options.totalConnectionTime).takeUntil(startCloser$).takeUntil(intervalCloser$).mapTo(TerminationReason.ChannelTimeout);
//we need to publishReplay it so we can get the reason afterwards...
let close$ = startCloser$.merge(intervalCloser$, maxTimeCloser$).take(1).publishReplay(1);
//basically treating close$ like a promise
close$.connect();
//cleanupAction has side-effects so it must only be subscribed to once.
let cleanupAction$ = Observable.defer(async () => {
//it's just a promise that yields nothing and waits until requestTermination has terminated
//requestTermination is an async function and it already has a timeout thing in promise-language
await this.requestTermination();
});
let result$ = dataStream$.takeUntil(close$).concat(dataStream$.takeUntil(cleanupAction$)).toArray().switchMap(arrs => {
//switchMap will only resolve once because the observable is a singleton
return close$.map(reason => {
//this should fire immediately because close is publishReplay(1) and has already happened
let totalArr = _.flattenDeep(arrs);
return {
reason : reason,
data : totalArr
}
})
});
return result$;
链接地址: http://www.djcxy.com/p/40722.html
上一篇: 无法推断超类
下一篇: 根据rxjs中的时间处理事件流