Search code examples
rxjsreactivex

Cancelling expand if other event occurs


I'm new to ReactiveX and have been trying to build a "CS:Go bomb explosion notifier" program using RxJS.

I have the following so far:

// TimeSubject is the current heartbeat of the game (i.e. current client time),
// it is not emitted every 1 second (it will be emitted at least once every 30
// seconds and also whenever any other game event occurs)

// RoundSubject stores information about the current and previous round state
combineLatest([TimeSubject, RoundSubject])
    .pipe(
        // detect when the bomb is planted
        filter(([_, { current }]) => current.bomb === 'planted'),
        // create a timer object to keep track of the time until explosion
        map(([time, _]) => ({ plantTime: time, explosionTime: time + 40, currentTime: time })),
        // ignore other "bomb planted" events for the next 50 seconds
        throttleTime(50 * 1000),
        // count down until bomb is exploded
        // problem: RoundSubject can emit an event indicating the bomb got defused
        //          or the round ended before the bomb got to explode,
        //          but how do I catch that since I am throttling the events from that channel?
        expand(({ plantTime, explosionTime, currentTime }) =>
            explosionTime > currentTime
                ? of(({ plantTime, explosionTime, currentTime: currentTime + 1 }))
                    .pipe(delay(1000))
                : EMPTY)
    ).subscribe(({ plantTime, explosionTime, currentTime }) => {
        if (plantTime === currentTime)
            console.log('Bomb planted and will explode in 40 seconds!');
        else if (explosionTime >= currentTime) {
            const secondsToExplode = explosionTime - currentTime;
            console.log(`.. explodes in: ${secondsToExplode} seconds`);
        }
    });

The problem here is that RoundSubject can emit an event like RoundEnded or Defused, which in either case should cancel the timer.

At the moment I don't know enough about the available operators to see how I can fix this in a good way. Additionally, I feel that my code is quite convoluted with the expand, so if you know a better approach, let me know :-).

Thanks.


Solution

  • When in doubt, name things!

    By far the biggest pitfall in writing code with RxJS is writing observables with 5+ operators in a single pipe. It's very easy to lose the plot.

    Don't be afraid to create multiple named streams; things will read much more naturally.

    // This creates a stream that emits every time the bomb's status changes to the
    // provided value.
    const bombChangedStatusTo = (status) =>
      RoundSubject.pipe(
        pluck('current'),
        distinctUntilKeyChanged('bomb'),
        filter((bombStatus) => bombStatus === status)
      );
    
    const bombPlanted$ = bombChangedStatusTo('planted');
    const bombDefused$ = bombChangedStatusTo('defused');
    

    The other answer is correct, expand is overkill here. Assuming we know the start time, the countdown can be as simple as mapping over the values emitted by some interval (see the last section for why we don't actually need plantTime here).

    // we use share() since we'll subscribe to this more than once, it
    // ensures that we're subscribing to the exact same interval each time
    const clockInterval$ = interval(1000).pipe(
      startWith(null), // emit immediately instead of after 1s
      map(() => Math.floor(Date.now()/1000)),
      share()
    );
    
    const countDown = (startTime) =>
      clockInterval$.pipe(
        map((currentTime) => ({
          explosionTime: startTime + 40,
          currentTime
        })),
        takeWhile(
          ({ currentTime, explosionTime }) => currentTime < explosionTime,
          true // include the emission that triggered completion
        )
      );
    

    Here we use exhaustMap to ensure that only one timer will be run per "bomb planted" event (see the docs). No need to use throttleTime, which would give us two timers counting to 40 instead of just one.

    const bombClock$ = bombPlanted$.pipe(
      withLatestFrom(clockInterval$), // <-- reusing the shared clock
      exhaustMap(([_, plantTime]) =>
        countDown(plantTime).pipe(
          takeUntil(bombDefused$) // stop the timer if the bomb is defused
        )
      )
    );
    

    If we trigger the "bomb was planted" side effect using bombPlanted$, we no longer need to pass around plantTime as a property on the bombClock$ value

    bombPlanted$.subscribe(() => {
      console.log('Bomb planted and will explode in 40 seconds!');
    });
    
    bombClock$.subscribe(({ explosionTime, currentTime }) => {
      if (explosionTime >= currentTime) {
        const secondsToExplode = explosionTime - currentTime;
        console.log(`.. explodes in: ${secondsToExplode} seconds`);
      } else {
        console.log('The bomb has exploded');
      }
    });