我如何使用事件和承诺来控制程序流?

我有这样一个班级:

import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';

class MyClass extends EventEmitter {
    constructor(host = 'localhost', port = 10011) {
        super(EventEmitter);
        this.host = host;
        this.port = port;
        this.socket = null;
        this.connect();
    }
    connect() {
        this.socket = net.connect(this.port, this.host);
        this.socket.on('connect', this.handle.bind(this));
    }
    handle(data) {
        this.socket.on('data', data => {

        });
    }
    send(data) {
        this.socket.write(data);
    }
}

我将如何将send方法转换为承诺,该承诺从套接字的data事件返回一个值? 当数据发送给服务器时,服务器只发回数据,而不是一个容易被抑制的连接消息。

我试过类似的东西:

handle(data) {
    this.socket.on('data', data => {
        return this.socket.resolve(data);
    });
    this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
    return new Promise((resolve, reject) => {
        this.socket.resolve = resolve;
        this.socket.reject = reject;
        this.socket.write(data);
    });
}

显然这不起作用,因为当链接和/或调用并行send多次时, resolve / reject会相互覆盖。

还有两次并行调用send的问题,它会解决哪个响应首先返回。

我目前有一个实现使用队列和推迟,但它感觉很混乱,因为队列不断被检查。

我希望能够做到以下几点:

let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
    return c.send('bar', response.param);
    //`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
    console.log(response);
}).catch(error => console.log(error));

只需添加,我无法控制收到的数据,这意味着它不能在流之外进行修改。

编辑 :所以这似乎是不可能的,由于TCP没有请求 - 响应流。 这仍然可以如何使用承诺来实现,但是使用单一执行(一次一个请求)承诺链或队列。


我将问题简化到最低限度,并使浏览器可运行:

  • 套接字类被嘲笑。
  • EventEmitter删除了有关端口,主机和继承的EventEmitter
  • 该解决方案通过将新请求附加到承诺链上,但在任何给定时间点允许最多一个开放/未答复请求。 .send每次调用时会返回一个新的promise,并且该类负责处理所有内部同步。 因此, .send可以被多次调用,并且保证了请求处理的正确顺序(FIFO)。 我添加的一个附加功能是修剪承诺链,如果没有未处理的请求。


    注意我完全省略了错误处理,但是无论如何都应该针对您的特定用例量身定制。


    DEMO

    class SocketMock {
    
      constructor(){
        this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) ); 
        this.listeners = {
      //  'error' : [],
        'data' : []
        }
      }
    
      send(data){
    
        console.log(`SENDING DATA: ${data}`);
        var response = `SERVER RESPONSE TO: ${data}`;
        setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),               
                   Math.random()*2000 + 250); 
      }
    
      on(event, callback){
        this.listeners[event].push(callback); 
      }
    
    }
    
    class SingleRequestCoordinator {
    
        constructor() {
            this._openRequests = 0; 
            this.socket = new SocketMock();
            this._promiseChain = this.socket
                .connected.then( () => console.log('SOCKET CONNECTED'));
          this.socket.on('data', (data) => {
            this._openRequests -= 1;
            console.log(this._openRequests);
            if(this._openRequests === 0){
              console.log('NO PENDING REQUEST --- trimming the chain');
              this._promiseChain = this.socket.connected
            }
            this._deferred.resolve(data);
          });
    
        }
    
        send(data) {
          this._openRequests += 1;
          this._promiseChain = this._promiseChain
            .then(() => {
                this._deferred = Promise.defer();
                this.socket.send(data);
                return this._deferred.promise;
            });
          return this._promiseChain;
        }
    }
    
    var sender = new SingleRequestCoordinator();
    
    sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
    sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
    sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
    
    setTimeout(() => sender.send('data-4')
        .then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
    

    如果send()调用彼此混淆,则应将其保存到缓存中。 为了确保接收到的消息与发送的消息相匹配,您应该为每个消息分配一些唯一的id到负载中。

    所以你的消息发送者将看起来像这样

    class MyClass extends EventEmitter {
      constructor() {
        // [redacted]
        this.messages = new Map();
      }
    
      handle(data) {
        this.socket.on('data', data => {
           this.messages.get(data.id)(data);
           this.messages.delete(data.id);
        });
      }
    
      send(data) {
        return return new Promise((resolve, reject) => {
            this.messages.set(data.id, resolve);
            this.socket.write(data);
        });
      }
    }
    

    此代码对于消息顺序不明智,您将获得所需的API。


    socket.write(data[, encoding][, callback])接受回调。 您可以拒绝或解决此回调。

    class MyClass extends EventEmitter {
      constructor(host = 'localhost', port = 10011) {
        super(EventEmitter);
        this.host = host;
        this.port = port;
        this.socket = null;
        this.requests = null;
        this.connect();
      }
      connect() {
        this.socket = net.connect(this.port, this.host);
        this.socket.on('connect', () => {
          this.requests = [];
          this.socket.on('data', this.handle.bind(this));
          this.socket.on('error', this.error.bind(this));
        });
      }
      handle(data) {
        var [request, resolve, reject] = this.requests.pop();
        // I'm not sure what will happen with the destructuring if requests is empty
        if(resolve) {
          resolve(data);
        }
      }
      error(error) {
        var [request, resolve, reject] = this.requests.pop();
        if(reject) {
          reject(error);
        }
      }
      send(data) {
        return new Promise((resolve, reject) => {
          if(this.requests === null) {
            return reject('Not connected');
          }
          this.requests.push([data, resolve, reject]);
          this.socket.write(data);
        });
      }
    }
    

    未经测试,因此不确定方法签名,但这是基本的想法。

    这假定每个请求都会有一个handleerror事件。

    我对它的考虑越多,如果应用程序数据中没有附加信息,例如数据包编号与请求的响应相匹配,这似乎就不可能。

    现在它的实现方式(以及它在你的问题中的方式),它甚至不能确定一个答案只能匹配一个handle事件。

    链接地址: http://www.djcxy.com/p/27491.html

    上一篇: How can I control program flow using events and promises?

    下一篇: increment via jQuery