Search code examples
javascriptnode.jsrxjsreactive-extensions-js

RxJS bufferWithCount() not pausing for timeout


I am trying to control the inflow for a slow subscriber. Tried the below in NodeJS

var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);

var commJson = xmlNodeStream.bufferWithCount(2).publish();

var FastSubscriber = commJson.subscribe(
      function (x) { console.log('----------\nFastSub: onNext: %s', x); },
      function (e) { console.log('FastSub: onError: %s', e); },
      function () { console.log('FastSub: onCompleted'); });

var slowSubscriber = commJson.subscribe(function (x) {
    setTimeout(function () { console.log("============\nSlowsub called: ", x); }, 5000);
});

commJson.connect();

When I run the above code, I would expect the slow subscriber to pause for 5 seconds everytime before next data-batch is received.

But that is not happening. After an initial 5 second delay, all data is flooded to slowSubscriber in batches of 2.

What is the right way to control the inflow so that slow subscibers can take their time (and preferably fast ones can wait for the slow ones to complete) ?


Solution

  • It isn't pausing because setTimeout will not block execution it just schedules work to be done at a later time, i.e. after 2 seconds, then more data comes in and it gets scheduled for 2 seconds + some tiny delta from now. The result being that the fast and slow subscriber will finish at the same time, but the results of slow subscriber won't be visualized until 2 seconds later.

    If the slow subscriber in your actual use case is really non-blocking then you have two options for controlling the flow of the events, either you need to control the flow from the source of the messages, where ever that may be. Or you need to use one of the back pressure operators like controlled()

    var xmlNodeStream = Rx.Observable.from([1,2,3,4,5,6,7,8,9,10,11]);
    
    var controller = xmlNodeStream.bufferWithCount(2).controlled();
    var commJson = controller.publish().refCount();
    
    var FastSubscriber = commJson.subscribe(
          function (x) { console.log('----------\nFastSub: onNext: %s', x); },
          function (e) { console.log('FastSub: onError: %s', e); },
          function () { console.log('FastSub: onCompleted'); });
    
    var slowSubscriber = commJson.subscribe(function (x) {
        setTimeout(function () { 
                    console.log("============\nSlowsub called: ", x); 
                    controller.request(1);
                   }, 5000);
    });
    
    commJson.request(1);