RxJS: Recursive list of observables and single observer

I've been having some trouble with a recursive chain of observables.

I am working with RxJS, which is currently in version 1.0.10621, and contains most basic Rx functionality, in conjunction with Rx for jQuery.

Let me introduce an example scenario for my problem: I am polling the Twitter search API (JSON response) for tweets/updates containing a certain keyword. The response also includes a "refresh_url" which one should use to generate follow-up request. The response to that follow-up request will again contain a new refresh_url, etc.

Rx.jQuery allows me to make the Twitter search API call an observable event, which produces one onNext and then completes. What I have tried so far is to have the onNext handler remember the refresh_url and use it in the onCompleted handler to produce both a new observable and corresponding observer for the next request. This way, one observable + observer pair follows the other indefinitely.

The problem with this approach is:

  • The follow-up observable/observer are already alive when their predecessors have not yet been disposed of.

  • I have to do lots of nasty bookkeeping to maintain a valid reference to the currently living observer, of which there can actually be two. (One in onCompleted and the other somewhere else in its life-cycle) This reference is, of course, needed to unsubscribe/dispose of the observer. An alternative to the bookkeeping would be to implement a side effect by the means of a "still running?"-boolean, as I have done in my example.

  • Example code:

                running = true;
                twitterUrl = "http://search.twitter.com/search.json";
                twitterQuery = "?rpp=10&q=" + encodeURIComponent(text);
                twitterMaxId = 0; //actually twitter ignores its since_id parameter
    
                newTweetObserver = function () {
                    return Rx.Observer.create(
                            function (tweet) {
                                if (tweet.id > twitterMaxId) {
                                    twitterMaxId = tweet.id;
                                    displayTweet(tweet);
                                }
                            }
                        );
                }
    
                createTwitterObserver = function() {
                    twitterObserver = Rx.Observer.create(
                            function (response) {
                                if (response.textStatus == "success") {
                                    var data = response.data;
                                    if (data.error == undefined) {
                                        twitterQuery = data.refresh_url;
                                        var tweetObservable;
                                        tweetObservable = Rx.Observable.fromArray(data.results.reverse());
                                        tweetObservable.subscribe(newTweetObserver());
                                    }
                                }
                            },
                            function(error) { alert(error); },
                            function () {
                                //create and listen to new observer that includes a delay 
                                if (running) {
                                    twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery).delay(3000);
                                    twitterObservable.subscribe(createTwitterObserver());
                                }
                            } 
                        );
                    return twitterObserver;
                }
                twitterObservable = $.getJSONPAsObservable(twitterUrl, twitterQuery);
                twitterObservable.subscribe(createTwitterObserver());
    

    Don't be fooled by the double layer of observables/observers from requests to tweets. My example concerns itself mainly with the first layer: requesting data from Twitter. If in solving this problem the second layer (converting responses into tweets) can become one with the first one, that would be fantastic; But i think that's a whole different thing. For now.

    Erik Meijer pointed out the Expand operator to me (see example below), and suggested Join patterns as an alternative.

    var ys = Observable.Expand
    (new[]{0}.ToObservable() // initial sequence
                       , i => ( i == 10 ? Observable.Empty<int>() // terminate
             : new[]{i+1}.ToObservable() // recurse
     )
    );
    
    ys.ToArray().Select(a => string.Join(",", a)).DumpLive();
    

    This should be copy-pastable into LINQPad. It assumes singleton observables and produces one final observer.

    So my question is: How can I do the expand trick nicest in RxJS?

    EDIT:
    The expand operator can probably be implemented as shown in this thread. But one would need generators (and I only have JS < 1.6).
    Unfortunately RxJS 2.0.20304-beta does not implement the Extend method.


    So i am going to attempt to solve your problem slightly different than you did and take some liberties that you can solve easier.

    So 1 thing i cannot tell is that are you attempting to the following steps

  • Get the first tweet list, which contains next url
  • Once tweet list received, onNext the current observer and get the next set of tweets
  • Do this indefinitely
  • Or is there a user action (the get more / scrolling to bottom). Either way, its really the same problem. I may be reading your problem incorrectly though. Here is the answer for that.

    function getMyTweets(headUrl) {
        return Rx.Observable.create(function(observer) {
    
            innerRequest(headUrl);
            function innerRequest(url) {
                var next = '';
    
                // Some magic get ajax function
                Rx.get(url).subscribe(function(res) {
                    observer.onNext(res);
                    next = res.refresh_url;
                },
                function() {
                    // Some sweet handling code
                    // Perhaps get head?
                },
                function() {
                    innerRequest(next);
                });
            }
        });
    }
    

    This may not be the answer you were asking for. If not, sorry!


    Edit: After looking through your code, it looks like you want to take results as an array and observablize it.

    // From the results perform a select then a merge (if ordering does not matter).
    getMyTweets('url')
        .selectMany(function(data) {
            return Rx.Observable.fromArray(data.results.reverse());
        });
    
    // Ensures ordering
    getMyTweets('url')
        .select(function(data) {
            return Rx.Observable.fromArray(data.results.reverse());
        })
        .concat();
    
    链接地址: http://www.djcxy.com/p/58560.html

    上一篇: 在水豚的新窗口中打开链接

    下一篇: RxJS:可观察对象和单个观察者的递归列表