Resume flow after write stream emits an error?

I'm using nodejs streams as a pipeline for data processing. The problem I'm running up against, is that as soon as my writeStream destination emits an error, the pipeline stops flowing data to the writeStream. Indeed, it looks like pipe is implemented as such. As soon as a single error is emitted, the writeStream gets detached from the pipe. https://github.com/joyent/node/blob/master/lib/stream.js#L89-112

This seems a bit aggressive for what I want though. I'd like to be able to make note of the error, but keep the pipeline going. How can I do this?

My stream looks like this:

client.readStream()
        .pipe(process1)
        .pipe(process2)
        .pipe(process3)
        .pipe(mongo.writeStream())
        .on('data', console.log)
        .on('error', console.log);

process1 , process2 , process3 , are implemented as .map() from the event-stream library.

The mongo write stream looks like this.

function MongoStream(_mongo) {
  Stream.Writable.call(this, { objectMode : true });
}

util.inherits(MongoStream, Stream.Writable);

MongoStream.prototype._write = function (model, encoding, callback) {
  var self = this;
  console.log("Saving model");
  model.save(function(err, result) {
    if(err){ 
        self.emit('error', err);
    } else {
        console.log("model saved");
        self.emit('data', result);
    }
    callback();
  });
};

The read stream is simply a .find().stream() from a mongoose model


Use a name other than error for the event:

self.emit('error', err);

could be say:

self.emit('err', err);

that wouldn't trigger the listener that .pipe installs.

And then use: .on('err', console.log)

This might be better than attempting to change .pipe so that it does nothing for error .

链接地址: http://www.djcxy.com/p/87248.html

上一篇: 如何在nodejs中设置sequelize orm

下一篇: 写入流发出错误后恢复流程?