根据rxjs中的时间处理事件流
我有一个进程每隔一段时间发送一次数据包,我需要根据数据包何时到达的时间来管理这个数据流,等等。 在某个时候,我也关闭了流和过程。
现在,我使用一组定时器来做到这一点,但我希望我可以用rxjs
做到这一点,因为它似乎非常适合这种事情。 到目前为止,我还没有取得太多的成功。
问题
该流应该定期向我发送数据包,但它通常偏离很多,有时会卡住。
在下列情况下,我想在某个时候关闭流:
startDelay
发送第一个数据包需要多于startDelay
。 middleDelay
的暂停。 maxChannelTime
。 由于上述任何原因,当我即将关闭流时,我首先要求它有礼貌地关闭,以便可以进行一些清理。 有时它也会在清理过程中向我发送最终的数据包。 但是我希望不再等待cleanupTime
清理和最后的数据到达,然后关闭流并忽略更多消息。
精
我将通过用Observable包装事件来创建“流”。 我没有这样做的麻烦。
通过“关闭”流,我的意思是告诉流程停止发送数据,并可能关闭(即死亡)。
棘手的问题。
我已经把它分为两个阶段 - “规范”(因为我们要定期检查)和“清理”。
反向工作,输出是
const regulated = source.takeUntil(close)
const cleanup = source.skipUntil(close).takeUntil(cleanupCloser)
const output = regulated.merge(cleanup)
'关闭器'是在关闭时发出的可观察结果(每超时值一个关闭)。
const startTimeout = 600
const intervalTimeout = 200
const maxtimeTimeout = 3000
const cleanupTimeout = 300
const startCloser = Observable.timer(startTimeout) // emit once after initial delay
.takeUntil(source) // cancel after source emits
.mapTo('startTimeoutMarker')
const intervalCloser = source.switchMap(x => // reset interval after each source emit
Observable.timer(intervalTimeout) // emit once after intervalTimeout
.mapTo('intervalTimeoutMarker')
)
const maxtimeCloser = Observable.timer(maxtimeTimeout) // emit once after maxtime
.takeUntil(startCloser) // cancel if startTimeout
.takeUntil(intervalCloser) // cancel if intervalTimeout
.mapTo('maxtimeTimeoutMarker')
const close = Observable.merge(startCloser, intervalCloser, maxtimeCloser).take(1)
const cleanupCloser = close.switchMap(x => // start when close emits
Observable.timer(cleanupTimeout) // emit once after cleanup time
)
.mapTo('cleanupTimeoutMarker')
这里有一个工作示例CodePen(请一次运行一个测试)
如果不知道如何使用RxJS创建“流”,或者您希望以后如何使用它们,则很难给出任何建议。
一般来说,只要使用takeUntil()
, switchMap()
和timeout()
就可以实现你想要的。
Observable.defer(...)
.startWith(undefined) // Trigger the first `timeout`
.switchMap((val, i) => {
if (i === 0) { // waiting for the first value
return Observable.of().timeout(startDelay);
} else {
return Observable.of(val).timeout(middleDelay);
}
})
.takeUntil(Observable.timer(maxChannelTime));
我不知道你的意思是“在某一时刻关闭这条河流”。 你期望error
还是complete
通知? 此解决方案将在超时过期时发出error
,并在takeUntil
发出时complete
。
最后,这就是我所做的。 我的回答主要基于理查德马森的回答,所以我将他的答案留给接受。
事实证明,我需要做一些额外的修改。
该代码是接收数据消息流并返回包含收集的所有数据和终止原因的单例观察值的代码。
let startCloser$ = Observable.timer(this.options.maxStartDelay).takeUntil(dataStream$).mapTo(TerminationReason.StartTimeout);
let intervalCloser$ = dataStream$.switchMap(x => Observable.timer(this.options.timeBetweenPackets).mapTo(TerminationReason.Inactivity));
let maxTimeCloser$ = Observable.timer(this.options.totalConnectionTime).takeUntil(startCloser$).takeUntil(intervalCloser$).mapTo(TerminationReason.ChannelTimeout);
//we need to publishReplay it so we can get the reason afterwards...
let close$ = startCloser$.merge(intervalCloser$, maxTimeCloser$).take(1).publishReplay(1);
//basically treating close$ like a promise
close$.connect();
//cleanupAction has side-effects so it must only be subscribed to once.
let cleanupAction$ = Observable.defer(async () => {
//it's just a promise that yields nothing and waits until requestTermination has terminated
//requestTermination is an async function and it already has a timeout thing in promise-language
await this.requestTermination();
});
let result$ = dataStream$.takeUntil(close$).concat(dataStream$.takeUntil(cleanupAction$)).toArray().switchMap(arrs => {
//switchMap will only resolve once because the observable is a singleton
return close$.map(reason => {
//this should fire immediately because close is publishReplay(1) and has already happened
let totalArr = _.flattenDeep(arrs);
return {
reason : reason,
data : totalArr
}
})
});
return result$;
链接地址: http://www.djcxy.com/p/40721.html
上一篇: Handling a stream of events based on timing in rxjs
下一篇: How do I get the HTML contained within a td using jQuery?