Resume flow after write stream emits an error?
I'm using nodejs streams as a pipeline for data processing. The problem I'm running up against, is that as soon as my writeStream destination emits an error, the pipeline stops flowing data to the writeStream. Indeed, it looks like pipe
is implemented as such. As soon as a single error is emitted, the writeStream gets detached from the pipe. https://github.com/joyent/node/blob/master/lib/stream.js#L89-112
This seems a bit aggressive for what I want though. I'd like to be able to make note of the error, but keep the pipeline going. How can I do this?
My stream looks like this:
client.readStream()
.pipe(process1)
.pipe(process2)
.pipe(process3)
.pipe(mongo.writeStream())
.on('data', console.log)
.on('error', console.log);
process1
, process2
, process3
, are implemented as .map() from the event-stream library.
The mongo write stream looks like this.
function MongoStream(_mongo) {
Stream.Writable.call(this, { objectMode : true });
}
util.inherits(MongoStream, Stream.Writable);
MongoStream.prototype._write = function (model, encoding, callback) {
var self = this;
console.log("Saving model");
model.save(function(err, result) {
if(err){
self.emit('error', err);
} else {
console.log("model saved");
self.emit('data', result);
}
callback();
});
};
The read stream is simply a .find().stream()
from a mongoose model
Use a name other than error
for the event:
self.emit('error', err);
could be say:
self.emit('err', err);
that wouldn't trigger the listener that .pipe
installs.
And then use: .on('err', console.log)
This might be better than attempting to change .pipe
so that it does nothing for error
.
上一篇: 如何在nodejs中设置sequelize orm
下一篇: 写入流发出错误后恢复流程?