Search code examples
angularrxjsobservablepollingrxjs-observables

Stop RxJS HTTP Polling after HTTP success response


I have following code that mimics HTTP Requests polling.

  timeout:Observable<number> = timer(10000);

  startPollingStackblitz(arnId: string) {
    const poll:Observable<BuyingData[]> = of({}).pipe(
        mergeMap(_ => {
          console.log('polling...' + arnId);
          return of([]);
          // return this.service.getData(arnId);
        }),
        takeUntil(this.timeout),
        tap(_ => console.info('---waiting 2 secs to restart polling')),
        delay(2000),
        repeat(),
        tap(_ => console.info('---restarted polling')),
      );

    this.subscription = poll.subscribe((data) => {
      console.log('subscribe...')
      if (data.length > 0) {
        console.log('timeout...');
        console.log(this.timeout);// I want to stop polling immediately before timer will elapse
      }
    });
  }

I want my polling stops sending HTTP Requests (it logs 'polling...' at this demo version) when server responds with data.length > 0. For some reason it continues to send Requests even after 10000ms timeout. How can I do that?


Solution

  • Well, as I understand you have two stopping conditions :

    1. After a timeout (10sec)
    2. When a response meets your condition (data.length > 0)

    You can achieve this by combining takeUntil, race, and timer operators with a subject as below.

    const stopper = new Subject(); // to stop emitting
    const poll = of({}).pipe(
      mergeMap(_ =>
        fakeDelayedRequest().pipe(
          catchError(e => {
            console.error(e);
            return of(false);
          })
        )
      ),
      tap(write),
      tap(_ => console.info("---waiting 3 secs to restart polling")),
      delay(3000),
      tap(_ => console.info("---restarted polling")),
      repeat(),
      takeUntil(stopper.pipe(race(timer(10000)))) // this should be the last in the pipe
      // else `repeat` operator will be repeating without a condition.
    );
    
    poll.subscribe(_ => {
      const rnd = Math.random();
      if (rnd> 0.3) { // random stop condition
        console.log("closing !",rnd);
        stopper.next(); // emit the stop
      }
    });
    

    takeUntil will stop when the target observable emits a value. timer will emit a value after 10 secs. race will emit a value either from stopper or from timer which comes first.

    Stackblitz