How can I control program flow using events and promises?

I have a class like so:

import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';

class MyClass extends EventEmitter {
    constructor(host = 'localhost', port = 10011) {
        super(EventEmitter);
        this.host = host;
        this.port = port;
        this.socket = null;
        this.connect();
    }
    connect() {
        this.socket = net.connect(this.port, this.host);
        this.socket.on('connect', this.handle.bind(this));
    }
    handle(data) {
        this.socket.on('data', data => {

        });
    }
    send(data) {
        this.socket.write(data);
    }
}

How would I turn the send method into a promise, which returns a value from the socket's data event? The server only sends data back when data is sent to it, other than a connection message which can easily be suppressed.

I've tried something like:

handle(data) {
    this.socket.on('data', data => {
        return this.socket.resolve(data);
    });
    this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
    return new Promise((resolve, reject) => {
        this.socket.resolve = resolve;
        this.socket.reject = reject;
        this.socket.write(data);
    });
}

Obviously this won't work because resolve / reject will overwrite each other when chaining and/or calling send multiple times in parallel.

There's also the problem of calling send twice in parallel and it resolving whichever response comes back first.

I currently have an implementation using a queue and defers , but it feels messy since the queue is constantly being checked.

I'd like to be able to do the following:

let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
    return c.send('bar', response.param);
    //`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
    console.log(response);
}).catch(error => console.log(error));

Just to add, I don't have any control over the data that is received, meaning it can't be modified outside of the stream.

Edit : So it seems this is pretty impossible, due to TCP not having a request-response flow. How can this be implemented still using promises, but either using a single-execution (one request at a time) promise chain or a queue.


I distilled the problem to the bare minimum and made it browser runnable:

  • Socket class is mocked.
  • Removed info about port, host and inheritance from EventEmitter .
  • The solution works by appending new requests to the promise chain, but allowing maximum one open / not-answered request at any given timepoint. .send returns a new promise each time it is called and the class takes care of all internal synchronisation. So, .send may be called multiple times and the correct ordered (FIFO) of requests processing is guaranteed. One additional feature that I added is trimming the promise chain, if there are no pending requests.


    Caveat I omitted error handling altogether, butit should be tailored to your particular use case anyway.


    DEMO

    class SocketMock {
    
      constructor(){
        this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); 
        this.listeners = {
      //  'error' : [],
        'data' : []
        }
      }
    
      send(data){
    
        console.log(`SENDING DATA: ${data}`);
        var response = `SERVER RESPONSE TO: ${data}`;
        setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),               
                   Math.random()*2000 + 250); 
      }
    
      on(event, callback){
        this.listeners[event].push(callback); 
      }
    
    }
    
    class SingleRequestCoordinator {
    
        constructor() {
            this._openRequests = 0; 
            this.socket = new SocketMock();
            this._promiseChain = this.socket
                .connected.then( () => console.log('SOCKET CONNECTED'));
          this.socket.on('data', (data) => {
            this._openRequests -= 1;
            console.log(this._openRequests);
            if(this._openRequests === 0){
              console.log('NO PENDING REQUEST --- trimming the chain');
              this._promiseChain = this.socket.connected
            }
            this._deferred.resolve(data);
          });
    
        }
    
        send(data) {
          this._openRequests += 1;
          this._promiseChain = this._promiseChain
            .then(() => {
                this._deferred = Promise.defer();
                this.socket.send(data);
                return this._deferred.promise;
            });
          return this._promiseChain;
        }
    }
    
    var sender = new SingleRequestCoordinator();
    
    sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
    sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
    sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
    
    setTimeout(() => sender.send('data-4')
        .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
    

    If your send() calls is messed with each other, you should save it into cache. To be sure that received message matches with sent you should assign some unique id for each message into payload.

    So your message sender will look like this

    class MyClass extends EventEmitter {
      constructor() {
        // [redacted]
        this.messages = new Map();
      }
    
      handle(data) {
        this.socket.on('data', data => {
           this.messages.get(data.id)(data);
           this.messages.delete(data.id);
        });
      }
    
      send(data) {
        return return new Promise((resolve, reject) => {
            this.messages.set(data.id, resolve);
            this.socket.write(data);
        });
      }
    }
    

    This code will not be sensible to message order and you will get API, that you want.


    socket.write(data[, encoding][, callback]) takes a callback. You can reject or resolve in this callback.

    class MyClass extends EventEmitter {
      constructor(host = 'localhost', port = 10011) {
        super(EventEmitter);
        this.host = host;
        this.port = port;
        this.socket = null;
        this.requests = null;
        this.connect();
      }
      connect() {
        this.socket = net.connect(this.port, this.host);
        this.socket.on('connect', () => {
          this.requests = [];
          this.socket.on('data', this.handle.bind(this));
          this.socket.on('error', this.error.bind(this));
        });
      }
      handle(data) {
        var [request, resolve, reject] = this.requests.pop();
        // I'm not sure what will happen with the destructuring if requests is empty
        if(resolve) {
          resolve(data);
        }
      }
      error(error) {
        var [request, resolve, reject] = this.requests.pop();
        if(reject) {
          reject(error);
        }
      }
      send(data) {
        return new Promise((resolve, reject) => {
          if(this.requests === null) {
            return reject('Not connected');
          }
          this.requests.push([data, resolve, reject]);
          this.socket.write(data);
        });
      }
    }
    

    Not tested and therefore not sure about method signatures, but that's the basic idea.

    This assumes that there will be one handle or error event per request made.

    The more I think about it, this seems impossible without additional information in your application data, like packet numbers to match a response to a request.

    The way it's implemented now (and also the way it is in your question), it's not even sure that one answer will match exactly one handle event.

    链接地址: http://www.djcxy.com/p/27492.html

    上一篇: 如何从项目中删除Chutzpah而不在测试浏览器中显示结果?

    下一篇: 我如何使用事件和承诺来控制程序流?