Search code examples
javascriptajaxrxjsrxjs5reactive-extensions-js

RxJS 5 task queue, continue if a task fails


Imagine that we have an HTML page that fires AJAX requests. We want to make sure that AJAX requests are executed in order. The next AJAX request won't be fired until the previous one completes or errors.

I have tried to model this via a task queue using RxJS concatMap. Each AJAX request is modeled as an Observable. Everything is working great if AJAX request completes successfully, however if it errors, then the next task in the queue is not executed.

Here is an example, that uses setTimeout() to simulate long running async tasks:

  function identity(observable) {
    return observable;
  }

  function createTaskQueue() {
    var subject= new Rx.Subject();

    subject
    .concatMap(identity)
    .onErrorResumeNext(Rx.Observable.of('error'))
    .subscribe(function(data) {
      console.log('onNext', data);
    }, 
    function(error) {
      console.log('onError', error);
    });

    return {
      addTask: function(task) {
        subject.next(task);
      }
    }
  }

  function createTask(data, delay) {
    return Rx.Observable.create(function(obs) {
      setTimeout(function() {
        obs.next(data);
        obs.complete();
      }, delay);
    });
  }

  function createErrorTask(data, delay) {
    return Rx.Observable.create(function(obs) {
      setTimeout(function() {
        obs.error('Error: ' + data);
        obs.complete();
      }, delay);
    });
  }

  var taskQueue = createTaskQueue();

  taskQueue.addTask(createTask(11, 500))
  taskQueue.addTask(createTask(22, 200));
  taskQueue.addTask(createErrorTask(33, 1000));
  taskQueue.addTask(createTask(44, 300));
  taskQueue.addTask(createErrorTask(55, 300));
  taskQueue.addTask(createTask(66, 300));

Here is an executable example: https://jsfiddle.net/artur_ciocanu/s6ftxwnf/.

When I run this code the following is printed to the console: onNext 11 onNext 22 onNext error

Which is expected, but I wonder why the other tasks like 44, 55, etc are not executed.

I am pretty sure I am doing something stupid with onErrorResumeNext() or may be the whole approach is totally wrong.

Any help is very much appreciated.


Solution

  • If you read the documentation of onErrorResumeNext,

    Continues an observable sequence that is terminated normally or by an exception with the next observable sequence or Promise.

    What that means is that when your source observable will encounter an error, it will switch to whatever you passed to onErrorResumeNext. What happens here is that Rx.of(...) terminates immediately after emitting its value. Hence the behavior you observe.

    So in short, you don't want onErrorResumeNext here.

    You could instead .catch(...) the stream which could emit an error. So, something like :

    subject
        .concatMap(obs => obs.catch(Rx.Observable.of('error')))
        .subscribe(...)