Search code examples
javascriptrxjsreactivex

RxJS: An Observable that takes its own old value as input


I'm trying to implement something like this:

// This obviously doesn't work, because we try to refer to `a` before it exists.
// c and m are some Observables.
const aSampled = a.pipe(
  rxjs.operators.sample(c),
  rxjs.operators.startWith(aInitial)
);
const a = m.pipe(
  rxjs.operators.withLatestFrom(aSampled),
  map(([mValue, oldAValue]) => {
    // Do something.
  }),
  rxjs.operators.startWith(aInitial)
);

Now that's obivously uncompilable nonsense, since we try to sample a before creating it, but hopefully it makes my intent clear: Every emitted value from a should depend on one of the old values previously emitted by a. Which old value of a, that's decided by when was the last time that c emitted something. It's kinda like calling pairwise on a, except that I don't want the last two values, but the latest and another from further back.

Note that if not for the startWith(aInitial) bits, this wouldn't even be a well-defined question, since the first value emitted by a would be defined cyclically, refering to itself. However, as long as the first value a is specified separately, the construction makes mathematical sense. I just don't know how to implement it in code in a clean way. My hunch is that there would be a long-winded way of doing this by writing a custom Subject of some kind, but something more elegant would be very welcome.

To make this a bit more concrete, in my use case I'm dealing with a click-and-drag-to-pan type of UI element. m is an Observable of mousemove events, c is an Observable of mousedown events. a would then be something that keeps changing based where the cursor is, and what the value of a was when the click-down happened.


Solution

  • If I understand right, the basic streams of events are mousemove and a mousedown.

    Based on these streams of events, you have to calculate a new stream a, which emits with the same frequency of mousemove but whose data are the result of some calculation based on the current position of the mouse and the value a had when the last mousedown emitted.

    So, if this is true, we can simulate mousemove and mousedown with the following Observables

    // the mouse is clicked every 1 second
    const c = interval(1000).pipe(
        tap(cVal => console.log('click', cVal))
    );
    // the mouse moves diagonally and emits every 200 ms
    const m = interval(200).pipe(
        map(mVal => [mVal, mVal]),
    );
    

    What we need is to have somehow at hand the value of the Observable a when mousedown emits. How can we get this?

    Let's assume we have a BehaviourSubject called value_of_a whose initial value is 1 and that holds the value of a. If we had such Observable, we could get the its value when mousedown emits simply like this

    const last_relevant_a = c.pipe(       // when mousedown emits
        switchMap(() => value_of_a.pipe(  // the control is switched to the Observable value_of_a
            take(1),                      // and we consider only its first value
        )),
    );
    

    With m, the mousemove Observable, and last_relevant_a we have all the Observable that we need. In fact we have just to combine their latest emissions to have all the elements we need to calculate the new value of a.

    const a = combineLatest(m, last_relevant_a)
    .submit(
       ([mouseVal, old_a_val] => {
          // do something to calculate the new value of a
       }
    );
    

    Now the only thing left to do is to make sure that value_of_a emits any value emitted by a. This can be accomplished calling next on value_of_a within the subscription of a itself.

    Stitching it all together, the solution could be something like

    const c = interval(1000).pipe(
        tap(cVal => console.log('click', cVal))
    );
    
    const m = interval(200).pipe(
        map(mVal => [mVal, mVal]),
    );
    
    const value_of_a = new BehaviorSubject<number>(1);
    
    const last_relevant_a = c.pipe(
        switchMap(cVal => value_of_a.pipe(
            take(1),
        )),
    );
    
    const a = combineLatest(m, last_relevant_a);
    
    a.pipe(
        take(20)
    )
    .subscribe(
        val => {
            // new value of a calculated with an arbitrary logic
            const new_value_of_a = val[0][0] * val[0][1] * val[1];
            // the new value of a is emitted by value_of_a
            value_of_a.next(new_value_of_a);
        }
    )
    

    Probably this is also a use case for the expand operator, but it should be investigated.