Search code examples
javascriptrxjsevent-stream

Using RxJS event streams for simple sequence management


I'm trying to learn how to think in event streams, and I'm trying to wrap my head around it by testing it out on some contrived scenarios. The following scenario I'm stuck on:

Let's say I have two buttons, we'll call them A and B. A can be clicked at any point. B can also be clicked, but only if A was clicked immediately before. In other words, the B can never be clicked twice in a row. Here is the corresponding marbles diagram (where x means the event is ignored):

clickA$ --------o--------o-------->
                |        |
clickB$ --o--o--|--o--o--|--o--o-->
          x  x  |  |  x  |  |  x
logA$   --------o--|-----o--|----->
                   |        |       
logB$   -----------o--------o----->

The standard approach would be to store state in flags and make decisions using if/else control blocks, as shown below:

var a = document.getElementById('a');
var b = document.getElementById('b');
var flag = false;

a.attachEventListener('click', function(e){
  flag = true;
  console.log("A");
});
b.attachEventListener('click', function(e){
  if (flag) {
    flag = false;
    console.log("B");
  }
});

I feel like there should be a clean, concise way to do this without flags or some super messy event stream merging but it's escaping me. Is there a someMethod() in RxJS such that we can do the following to have the desired effect?

var a = document.getElementById('a');
var b = document.getElementById('b');
var a$ = Rx.Observable.fromEvent(a, "click").map(e => "A")
var b$ = Rx.Observable.fromEvent(b, "click").map(e => "B")
           .someMethod(a$)
a$.subscribe(x => console.log(x))
b$.subscribe(x => console.log(x))

Solution

  • I make the assumption that logB$ is a hot source, just like clickA$.

    Looks like logA$ = clickA$.

    For logB$ :

    logB$ = clickA$.flatMapLatest(function (clickA){
              return clickB$.take(1);
            });
    

    flatMapLatest allows you to take a value from clickA$, and associate it to a stream, here clickB$.take(1). flatMapLatest will hence emit the values from that stream till it completes, which it will after emitting the first emitted value from clickB$ (because of the take(1)). When a new value from clickA$ arrives, the previous observable derived from the previous value is unsubscribed to, and the observable derived from the new value is subscribed to. That gives you your expected behaviour.

    Documentation links :