How can I control program flow using events and promises?
I have a class like so:
import net from 'net';
import {EventEmitter} from 'events';
import Promise from 'bluebird';
class MyClass extends EventEmitter {
constructor(host = 'localhost', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on('connect', this.handle.bind(this));
}
handle(data) {
this.socket.on('data', data => {
});
}
send(data) {
this.socket.write(data);
}
}
How would I turn the send
method into a promise, which returns a value from the socket's data
event? The server only sends data back when data is sent to it, other than a connection message which can easily be suppressed.
I've tried something like:
handle(data) {
this.socket.on('data', data => {
return this.socket.resolve(data);
});
this.socket.on('error', this.socket.reject.bind(this));
}
send(data) {
return new Promise((resolve, reject) => {
this.socket.resolve = resolve;
this.socket.reject = reject;
this.socket.write(data);
});
}
Obviously this won't work because resolve
/ reject
will overwrite each other when chaining and/or calling send
multiple times in parallel.
There's also the problem of calling send
twice in parallel and it resolving whichever response comes back first.
I currently have an implementation using a queue and defers , but it feels messy since the queue is constantly being checked.
I'd like to be able to do the following:
let c = new MyClass('localhost', 10011);
c.send('foo').then(response => {
return c.send('bar', response.param);
//`response` should be the data returned from `this.socket.on('data')`.
}).then(response => {
console.log(response);
}).catch(error => console.log(error));
Just to add, I don't have any control over the data that is received, meaning it can't be modified outside of the stream.
Edit : So it seems this is pretty impossible, due to TCP not having a request-response flow. How can this be implemented still using promises, but either using a single-execution (one request at a time) promise chain or a queue.
I distilled the problem to the bare minimum and made it browser runnable:
EventEmitter
. The solution works by appending new requests to the promise chain, but allowing maximum one open / not-answered request at any given timepoint. .send
returns a new promise each time it is called and the class takes care of all internal synchronisation. So, .send
may be called multiple times and the correct ordered (FIFO) of requests processing is guaranteed. One additional feature that I added is trimming the promise chain, if there are no pending requests.
Caveat I omitted error handling altogether, butit should be tailored to your particular use case anyway.
DEMO
class SocketMock {
constructor(){
this.connected = new Promise( (resolve, reject) => setTimeout(resolve,200) );
this.listeners = {
// 'error' : [],
'data' : []
}
}
send(data){
console.log(`SENDING DATA: ${data}`);
var response = `SERVER RESPONSE TO: ${data}`;
setTimeout( () => this.listeners['data'].forEach(cb => cb(response)),
Math.random()*2000 + 250);
}
on(event, callback){
this.listeners[event].push(callback);
}
}
class SingleRequestCoordinator {
constructor() {
this._openRequests = 0;
this.socket = new SocketMock();
this._promiseChain = this.socket
.connected.then( () => console.log('SOCKET CONNECTED'));
this.socket.on('data', (data) => {
this._openRequests -= 1;
console.log(this._openRequests);
if(this._openRequests === 0){
console.log('NO PENDING REQUEST --- trimming the chain');
this._promiseChain = this.socket.connected
}
this._deferred.resolve(data);
});
}
send(data) {
this._openRequests += 1;
this._promiseChain = this._promiseChain
.then(() => {
this._deferred = Promise.defer();
this.socket.send(data);
return this._deferred.promise;
});
return this._promiseChain;
}
}
var sender = new SingleRequestCoordinator();
sender.send('data-1').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-2').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
sender.send('data-3').then(data => console.log(`GOT DATA FROM SERVER --- ${data}`));
setTimeout(() => sender.send('data-4')
.then(data => console.log(`GOT DATA FROM SERVER --- ${data}`)), 10000);
If your send()
calls is messed with each other, you should save it into cache. To be sure that received message matches with sent you should assign some unique id
for each message into payload.
So your message sender will look like this
class MyClass extends EventEmitter {
constructor() {
// [redacted]
this.messages = new Map();
}
handle(data) {
this.socket.on('data', data => {
this.messages.get(data.id)(data);
this.messages.delete(data.id);
});
}
send(data) {
return return new Promise((resolve, reject) => {
this.messages.set(data.id, resolve);
this.socket.write(data);
});
}
}
This code will not be sensible to message order and you will get API, that you want.
socket.write(data[, encoding][, callback])
takes a callback. You can reject or resolve in this callback.
class MyClass extends EventEmitter {
constructor(host = 'localhost', port = 10011) {
super(EventEmitter);
this.host = host;
this.port = port;
this.socket = null;
this.requests = null;
this.connect();
}
connect() {
this.socket = net.connect(this.port, this.host);
this.socket.on('connect', () => {
this.requests = [];
this.socket.on('data', this.handle.bind(this));
this.socket.on('error', this.error.bind(this));
});
}
handle(data) {
var [request, resolve, reject] = this.requests.pop();
// I'm not sure what will happen with the destructuring if requests is empty
if(resolve) {
resolve(data);
}
}
error(error) {
var [request, resolve, reject] = this.requests.pop();
if(reject) {
reject(error);
}
}
send(data) {
return new Promise((resolve, reject) => {
if(this.requests === null) {
return reject('Not connected');
}
this.requests.push([data, resolve, reject]);
this.socket.write(data);
});
}
}
Not tested and therefore not sure about method signatures, but that's the basic idea.
This assumes that there will be one handle
or error
event per request made.
The more I think about it, this seems impossible without additional information in your application data, like packet numbers to match a response to a request.
The way it's implemented now (and also the way it is in your question), it's not even sure that one answer will match exactly one handle
event.
上一篇: 如何从项目中删除Chutzpah而不在测试浏览器中显示结果?
下一篇: 我如何使用事件和承诺来控制程序流?