Search code examples
rxjsobservablengrxrxjs-pipeable-operatorsrxjs-observables

RXJS flatMap to repetitive observable


I'm trying to implement service, which provides observable if app has connection to my server or not, so when browser online, we ping server with timer. Here is code:

public get $connected(): Observable<boolean> {
    return this.hasInternetConnection
               .asObservable()
               .pipe(
                 distinctUntilChanged(),
                 flatMap((connected: boolean) => {
                   if (!connected) {
                     return of(connected);
                   } else {
                     return timer(5000)
                       .pipe(
                         map(() => {
                           var success = Math.random() > 0.5;
                           console.log('PING: ' + success);
                           return success;
                         })
                       );
                   }
                 })
               );
  }

hasInternetConnection is just a BehaviorSubject bound to window online and offline events, timer emulates ping to my API server.

The issue is that my subscription $connected catches only first value from timer observable and then doesn't work. After hasInternetConnection subject changes to false and back to true, my subscription again gets first value and then nothing. Here is what I see in console:

PING: true
subscription tap
PING: true
PING: false
PING: true
...

How can I fix that? Thank you!


Solution

  • Full solution:

      private hasInternetConnection: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(navigator.onLine);
      private connectedSubject: BehaviorSubject<boolean> = new BehaviorSubject<boolean>(true);
      private recheckConnectionSubject: Subject<void> = new Subject<void>();
    
      constructor(
        private readonly http: HttpClient,
      ) {
        fromEvent(window, 'online')
          .pipe(takeUntil(this.destroyed))
          .subscribe(() => {
            this.hasInternetConnection.next(true);
          });
        fromEvent(window, 'offline')
          .pipe(takeUntil(this.destroyed))
          .subscribe(() => {
            this.hasInternetConnection.next(false);
          });
        merge(
          this.hasInternetConnection,
          this.recheckConnectionSubject,
        )
          .pipe(
            mapTo(this.hasInternetConnection.value),
            switchMap((connected: boolean) => {
              if (!connected) {
                return of(connected);
              } else {
                return timer(0, 30000)
                  .pipe(
                    mergeMapTo(this.http.get(`${environment.apiRoot}/ping`, { responseType: 'text' })
                                   .pipe(
                                     map((res) => {
                                       return true;
                                     }),
                                     catchError(() => {
                                       return of(false);
                                     })
                                   )
                    ),
                  );
              }
            })
          )
          .subscribe(this.connectedSubject);
      }
    
      public get $connected(): Observable<boolean> {
        return this.connectedSubject.asObservable()
                   .pipe(
                     distinctUntilChanged(),
                   );
      }
    
      public resetTimer(): void {
          this.recheckConnectionSubject.next();
      }