Search code examples
javascriptrxjsangular-httpclient

Queue http calls in rxjs


I'm working on a session service that checks if the auth token is expired. If it is, it makes a call to refresh the token. During this request, all incoming requests should be queued, and emitted once the request is finished. After that, all incoming request can pass through without the queue until the token expires again. I draw a marble diagram for this:

1. ---a---b---c--d-----e--
2. -t-------f------t------
3. ---a---b---------cd-e--

I named 1. as the incoming$ Observable, 2. is valve$ - if it's true, requests can pass through, if it's false, they should be queued. When it turns true, the queued are fired.

What I've done so far? I think this should be done by adding an intermediate Observable called receiver$ which changes its value depending on valve$. When valve$ is true, it just return a simple subject, if it's false, it returns one that's capable of recording values.

receiver$ = valve.pipe(
  map((value) => {
    if (value) {
      return new Subject();
    } else {
      return (new Subject()).pipe(
        shareReplay(),
      );
    }
  })
);

And then every new value obtained in incoming$ should be added to the current observable in recevier$:

incoming$.pipe(
  combineLatest(receiver$),
).subscribe((incomingValue, recevier) => {
  recevier.next(incomingValue);
});

And here's the part I cannot wrap my head around. Whenever valve turns true, I'd need the last two values from receiver$. The second last would hold the queue, and the last would hold the active subject. By merging them I could achieve my goal. I don't know how to implement this and how subscriptions will be managed. Also, this looks overly complicated for such a seemingly simple use case.

What's the best way of implementing this behavior?


Solution

  • You can do this by just using concatMap that merges two different streams based on the value form valve$. Note that this requires that both valve$ and incoming$ are shared with share().

    valve$
      .pipe(
        concatMap(v => v
          ? incoming$.pipe(takeUntil(valve$))
          : incoming$
            .pipe(
              takeUntil(valve$),
              bufferCount(Number.POSITIVE_INFINITY),
              mergeAll(),
            )
        ),
      )
      .subscribe(console.log)
    

    Live demo: https://stackblitz.com/edit/rxjs6-demo-d3bsxb?file=index.ts