Search code examples
rxjsdirectory-walknode-walk

How to use node-walk with RxJS?


Node walk presents an API with a few events like this.

walker.on('file', (root, filestats, next) => {
  // next should be called to goto next file
  next();
});
walker.on('end', () => {
  // the end of the stream
});

Is it reactive if from the subscriber you're calling a function to inform the source to go to the next item in the stream ? Events don't wait for the subscriber to react to it, right ?

How do I transform this into a Rx Observable ?


Solution

  • Your best bet is to create a wrapper around it:

    Rx.Observable.fromWalk = function(root, options, scheduler) {
      scheduler = scheduler || Rx.Scheduler.currentThread;
    
      return Rx.Observable.create(function(observer) {
        var walker = walk.walk(root, options);
    
        function fileHandler(x) {
          observer.onNext({stats : x.stats, root : x.root});
          scheduler.scheduleWithState(x, function(s, i) {
            i.next();
          });
        }
    
        var files = Rx.Observable.fromEvent(walker, 'file', 
          function(arr) {
            return { root : arr[0], stats : arr[1], next : arr[2] };
        });
        var ended = Rx.Observable.fromEvent(walker, 'end');
    
        return new Rx.CompositeDisposable(
          files.subscribe(fileHandler),
          ended.subscribe(observer.onCompleted.bind(observer))
        );
      });
    };
    

    I updated your example accordingly