Search code examples
javascriptreactive-programmingrxjsreactive-extensions-js

RxJS: Producer-consumer with abort


I've got a special producer consumer problem in RxJS: The producer slowly produces elements. A consumer is requesting elements and often has to wait for the producer. This can be achieved by zipping the producer and the request stream:

var produce = getProduceStream();
var request = getRequestStream();

var consume = Rx.Observable.zipArray(produce, request).pluck(0);

Sometimes a request gets aborted. A produced element should only consumed after a not aborted request:

produce:  -------------p1-------------------------p2--------->
request:  --r1--------------r2---------------r3-------------->
abort:    ------a(r1)------------------a(?)------------------>
consume:  ------------------c(p1, r2)-------------c(p2, r3)-->

The first request r1 would consume the first produced element p1, but r1 gets aborted by a(r1) before it can consume p1. p1 is produced and gets consumed c(p1, r2) on second request r2. The second abort a(?) is ignored, because no unanswered request happened before. The third request r3 has to wait on the next produced element p2 and is not aborted till p2 is produced. Thus, p2 is consumed c(p2, r3) immediately after it got produced.

How can I achieve this in RxJS?

Edit: I created an example with a QUnit test on jsbin. You can edit the function createConsume(produce, request, abort) to try/test your solution.

The example contains the function definition of the previously accepted answer.


Solution

  • This solution ignores aborts that don't follow an unanswered request:

    const {merge} = Rx.Observable;
    
    Rx.Observable.prototype.wrapValue = function(wrapper) {
        wrapper = (wrapper || {});
        return this.map(function (value) {
            wrapper.value = value;
            return wrapper;
        });
    };
    
    function createConsume(produce, request, abort) {
      return merge(
                produce.wrapValue({type: 'produce'}),
                request.wrapValue({type: 'request'}),
                abort.wrapValue({type: 'abort'})
             )
             .scan(
                [false, []],
                ([isRequest, products], e) => {
                    // if last time the request was answered
                    if (isRequest && products.length) {
                        // remove consumed product
                        products.shift();
                        // mark request as answered
                        isRequest = false;
                    }
                    if (e.type === 'produce') {
                        // save product to consume later
                        products.push(e.value);
                    } else {
                        // if evaluated to false, e.type === 'abort'
                        isRequest = (e.type === 'request');
                    }
                    return [isRequest, products];
                }
             )
             .filter( ([isRequest, products]) => (isRequest && products.length) )
             .map( ([isRequest, products]) => products[0] ); // consume
    }
    

    Code in newest test on JSBin.