Search code examples
angularrxjsangular-http-interceptorssliding-windowrxjs-subscriptions

Angular - http interceptors - http rate limiter - sliding window


I am having a use case where I need to limit the number of outgoing http requests. Yes, I do have rate limiter on the server-side but a limit on the number of active http requests is also need on the front end too.For that reason I am attempting to implement a sliding window protocol where at any single time I will only have n active requests.

This approach using Rxjs works fine in general, see here: https://jsbin.com/pacicubeci/1/edit?js,console,output

but I am not clear how to use the same logic with http interceptors. My attempt below fails at compile time with the following error:

Type 'Subscription' is missing the following properties from type 'Observable<HttpEvent>': _isScalar, source, operator, lift, and 114 more.(2740)

With that, how can I return an observable and maintain a queue at the http interceptor at the same time?Is my approach flawed? Can I use http interceptors to http rate limit at all?

@Injectable()
export class I1 implements HttpInterceptor {
  intercept(
    req: HttpRequest<any>,
    next: HttpHandler
  ): Observable<HttpEvent<any>> {
    const modified = req.clone({ setHeaders: { "Custom-Header-1": "1" } });

    return next
      .handle(req)
      .do((ev: HttpEvent<any>) => {
        if (ev instanceof HttpResponse) {
          console.log(ev);
        }
      })
      .pipe(
        bufferTime(1000, null, 1),
        filter(buffer => buffer.length > 0),
        concatMap(buffer => of(buffer).pipe(delay(1000)))
      )
      .subscribe(console.log);
      }
    }

https://stackblitz.com/edit/angular-interceptors-npqkjp?file=app/interceptors.ts


Solution

  • If you'd like to find out more about how interceptors and the HttpClientModule work under the hood, you could check out this article: Exploring the HttpClientModule in Angular.

    Is my approach flawed? In this case, the problem is that next.handle is expected to return an Observable, but by subscribing to it, it returns a Subscription.

    To get a better understanding of why, I will paste a snippet copied from the article linked above:

    const obsBE$ = new Observable(obs => {
      timer(1000)
        .subscribe(() => {
          // console.log('%c [OBSERVABLE]', 'color: red;');
    
          obs.next({ response: { data: ['foo', 'bar'] } });
    
          // Stop receiving values!
          obs.complete();
        })
    
        return () => {
          console.warn("I've had enough values!");
        }
    });
    
    // Composing interceptors the chain
    const obsI1$ = obsBE$
      .pipe(
        tap(() => console.log('%c [i1]', 'color: blue;')),
        map(r => ({ ...r, i1: 'intercepted by i1!' }))
      );
    
    let retryCnt = 0;
    const obsI2$ = obsI1$
      .pipe(
        tap(() => console.log('%c [i2]', 'color: green;')),
        map(r => { 
          if (++retryCnt <=3) {
            throw new Error('err!') 
          }
    
          return r;
        }),
        catchError((err, caught) => {
          return getRefreshToken()
            .pipe(
              switchMap(() => /* obsI2$ */caught),
            )
        })
      );
    
    const obsI3$ = obsI2$
      .pipe(
        tap(() => console.log('%c [i3]', 'color: orange;')),
        map(r => ({ ...r, i3: 'intercepted by i3!' }))
      );
    
    function getRefreshToken () {
      return timer(1500)
        .pipe(q
          map(() => ({ token: 'TOKEN HERE' })),
        );
    }
    
    function get () {
      return obsI3$
    }
    
    get()
      .subscribe(console.log)
    
    /* 
    -->
    [i1]
    [i2]
    I've had enough values!
    [i1]
    [i2]
    I've had enough values!
    [i1]
    [i2]
    I've had enough values!
    [i1]
    [i2]
    [i3]
    {
      "response": {
        "data": [
          "foo",
          "bar"
        ]
      },
      "i1": "intercepted by i1!",
      "i3": "intercepted by i3!"
    }
    I've had enough values!
    */
    

    StackBlitz demo.

    The gist is that interceptors create some sort of chain which ends with an observable that is responsible for making the actual request. This is the last node from the chain:

    return new Observable((observer: Observer<HttpEvent<any>>) => {
      // Start by setting up the XHR object with request method, URL, and withCredentials flag.
      const xhr = this.xhrFactory.build();
      xhr.open(req.method, req.urlWithParams);
      if (!!req.withCredentials) {
        xhr.withCredentials = true;
      }
      /* ... */
    })
    

    how can I return an observable and maintain a queue at the http interceptor at the same time

    I think a way to solve this is to create an interceptor that will contain the queue logic and make its intercept method return an Observable, so that it can be subscribed to:

    const queueSubject = new Subject<Observable>();
    
    const pendingQueue$ = queueSubject.pipe(
      // using `mergeAll` because the Subject's `values` are Observables
      mergeAll(limit),
      share(),
    );
    
    intercept (req, next) {
      // `next.handle(req)` - it's fine to do this, no request will fire until the observable is subscribed
      queueSubject.next(
        next.handle(req)
          .pipe(
            // not interested in `Sent` events
            filter(ev => ev instanceof HttpResponse),
    
            filter(resp => resp.url === req.url),
          )
      );
    
      return pendingQueue$;
    }
    

    The filter operators were used because by using share, the responses will be sent to all subscribers. Imagine you're synchronously calling http.get 5 times, so 5 new subscribers for share's Subject, and the last one will receive its response, but the response of other requests as well. So use can use filter in order to give the request the right response, in this case by comparing the URL of the request(req.url) with the URL we got from the HttpResponse.url:

    observer.next(new HttpResponse({
      body,
      headers,
      status,
      statusText,
      url: url || undefined,
    }));
    

    Link for the above snippet.


    Now, why did we use share() ?

    Let's see a simpler example first:

    const s = new Subject();
    
    const queue$ = s.pipe(
      mergeAll()
    )
    
    function intercept (req) {
      s.next(of(req));
      
      return queue$
    }
    
    // making request 1
    intercept({ url: 'req 1' }).subscribe();
    
    // making request 2
    intercept({ url: 'req 2' }).subscribe();
    
    // making request 3
    intercept({ url: 'req 3' }).subscribe();
    

    At this point, the Subject s should have 3 subscribers. This is because when you return queue, you return s.pipe(...) and when you subscribe to that, it's the same as doing:

    s.pipe(/* ... */).subscribe()
    

    so, that's why the subject will have 3 subscribers at the end.

    Now let's examine the same snippet, but with share():

    const queue$ = s.pipe(
      mergeAll(),
      share()
    );
    
    // making request 1
    intercept({ url: 'req 1' }).subscribe();
    
    // making request 2
    intercept({ url: 'req 2' }).subscribe();
    
    // making request 3
    intercept({ url: 'req 3' }).subscribe();
    

    After you subscribe to request 1, share will create a Subject instance and all the subsequent subscribers will belong to it, instead of belonging to the main Subject s. So, s will have only one subscriber. This will make sure that we implement the queue correctly, because although the Subject s has only one subscriber, it will still accept s.next() values, whose results will be passed along to the other subject(that one which comes from share()), which will eventually send the responses to all of its subscribers.