RxJS:可观察对象和单个观察者的递归列表

我一直在观察对象的递归链中遇到一些麻烦。

我正在使用RxJS,它目前在版本1.0.10621中,并且包含大部分基本的Rx功能,以及Rx for jQuery。

让我为我的问题介绍一个示例场景:我正在轮询包含特定关键字的推文/更新的Twitter搜索API(JSON响应)。 响应还包括一个“refresh_url”,用于生成后续请求。 对该后续请​​求的响应将再次包含新的refresh_url等。

Rx.jQuery允许我使Twitter搜索API调用一个可观察事件,该事件会产生一个onNext,然后完成。 我到目前为止所尝试的是让onNext处理程序记住refresh_url,并在onCompleted处理程序中使用它来为下一个请求生成一个新的observable和相应的观察者。 这样,一个可观察+观察者对无限期地跟随另一个。

这种方法的问题是:

  • 后续的观察员/观察员在他们的前任尚未处理时已经存在。

  • 我必须做很多讨厌的簿记才能保持对当前活跃观察者的有效参考,其中实际上有两个观察者。 (一个在onCompleted中,另一个在其生命周期中的其他地方)当然,这个引用需要取消订阅/处理观察者。 除了簿记以外,还可以通过“仍在运行?”的方式实现副作用 - 布尔值,正如我在示例中所做的那样。

  • 示例代码:

                running = true;
                twitterUrl = "http://search.twitter.com/search.json";
                twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
                twitterMaxId = 0; //actually twitter ignores its since_id parameter
    
                newTweetObserver = function () {
                    return Rx.Observer.create(
                            function (tweet) {
                                if (tweet.id > twitterMaxId) {
                                    twitterMaxId = tweet.id;
                                    displayTweet(tweet);
                                }
                            }
                        );
                }
    
                createTwitterObserver = function() {
                    twitterObserver = Rx.Observer.create(
                            function (response) {
                                if (response.textStatus == "success") {
                                    var data = response.data;
                                    if (data.error == undefined) {
                                        twitterQuery = data.refresh_url;
                                        var tweetObservable;
                                        tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                        tweetObservable.subscribe(newTweetObserver());
                                    }
                                }
                            },
                            function(error) { alert(error); },
                            function () {
                                //create and listen to new observer that includes a delay 
                                if (running) {
                                    twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                    twitterObservable.subscribe(createTwitterObserver());
                                }
                            } 
                        );
                    return twitterObserver;
                }
                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
                twitterObservable.subscribe(createTwitterObserver());
    

    不要被来自请求推文的双层观察者/观察者愚弄。 我的例子主要关注第一层:从Twitter请求数据。 如果在解决这个问题时第二层(将响应转换为推文)可以与第一层一起使用,那就太棒了; 但我认为这是完全不同的事情。 目前。

    Erik Meijer向我指出了Expand运算符(参见下面的示例),并建议使用Join模式作为替代。

    var ys = Observable.Expand
    (new[]{0}.ToObservable() // initial sequence
                       , i => ( i == 10 ? Observable.Empty<int>() // terminate
             : new[]{i+1}.ToObservable() // recurse
     )
    );
    
    ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
    

    这应该可以复制到LINQPad中。 它假设单身可观测量并产生一个最终观察者。

    所以我的问题是:我如何在RxJS中做最好的扩展技巧?

    编辑:
    扩展运算符可能可以按照此线程中显示的那样来实现。 但是需要生成器(我只有JS <1.6)。
    不幸的是RxJS 2.0.20304-beta没有实现Extend方法。


    所以我会试着解决你的问题,比你做的稍微有点不同,并采取一些自由,你可以更轻松地解决问题。

    所以我不能说的一件事是,你是否试图进行以下步骤

  • 获取包含下一个网址的第一条推文列表
  • 一旦收到鸣叫列表,onNext当前观察者并获取下一组推文
  • 无限期地这样做
  • 还是有用户操作(获取更多/滚动到底部)。 无论哪种方式,它都是同样的问题。 我可能会错误地阅读你的问题。 这是答案。

    function getMyTweets(headUrl) {
        return Rx.Observable.create(function(observer) {
    
            innerRequest(headUrl);
            function innerRequest(url) {
                var next = '';
    
                // Some magic get ajax function
                Rx.get(url).subscribe(function(res) {
                    observer.onNext(res);
                    next = res.refresh_url;
                },
                function() {
                    // Some sweet handling code
                    // Perhaps get head?
                },
                function() {
                    innerRequest(next);
                });
            }
        });
    }
    

    这可能不是你要求的答案。 如果没有,对不起!


    编辑:查看代码后,它看起来像要将结果作为数组并观察它。

    // From the results perform a select then a merge (if ordering does not matter).
    getMyTweets('url')
        .selectMany(function(data) {
            return Rx.Observable.fromArray(data.results.reverse());
        });
    
    // Ensures ordering
    getMyTweets('url')
        .select(function(data) {
            return Rx.Observable.fromArray(data.results.reverse());
        })
        .concat();
    
    链接地址: http://www.djcxy.com/p/58559.html

    上一篇: RxJS: Recursive list of observables and single observer

    下一篇: Efficient way to iterate over list of files