Search code examples
node.jsnode.js-stream

How to implement a writable stream


I want to pipe data from an amazon kinesis stream to a an s3 log or a bunyan log.

The sample works with a file write stream or stdout. How would I implmeny my own writable stream?

//this works
var file = fs.createWriteStream('my.log')
kinesisSource.pipe(file)

this doesn't work saying it has no method 'on'

var stream = {}; //process.stdout works however
stream.writable = true;
stream.write =function(data){
    console.log(data);
};
kinesisSource.pipe(stream);

what methods do I have to implement for my own custom writable stream, the docs seem to indicate I need to implement 'write' and not 'on'


Solution

  • To create your own writable stream, you have three possibilities.

    Create your own class

    For this you'll need:

    1. To extend the Writable class.
    2. To call the Writable constructor in your own constructor.
    3. To define a _write() method in the prototype of your stream object.

    Here's an example:

    var stream = require('stream');
    var util = require('util');
    
    function EchoStream () { // step 2
      stream.Writable.call(this);
    };
    util.inherits(EchoStream, stream.Writable); // step 1
    EchoStream.prototype._write = function (chunk, encoding, done) { // step 3
      console.log(chunk.toString());
      done();
    }
    
    var myStream = new EchoStream(); // instanciate your brand new stream
    process.stdin.pipe(myStream);
    

    Extend an empty Writable object

    Instead of defining a new object type, you can instanciate an empty Writable object and implement the _write() method:

    var stream = require('stream');
    var echoStream = new stream.Writable();
    echoStream._write = function (chunk, encoding, done) {
      console.log(chunk.toString());
      done();
    };
    
    process.stdin.pipe(echoStream);
    

    Use the Simplified Constructor API

    If you're using io.js, you can use the simplified constructor API:

    var writable = new stream.Writable({
      write: function(chunk, encoding, next) {
        console.log(chunk.toString());
        next();
      }
    });
    

    Use an ES6 class in Node 4+

    class EchoStream extends stream.Writable {
      _write(chunk, enc, next) {
        console.log(chunk.toString());
        next();
      }
    }