Search code examples
angularrxjsangular2-observablesrxjs-observables

Observable pipe not called when source Observable is updated


I have two Observables, one depends on data from the other.

When the data on the first Observable is changed it should update the second Observable.

Unfortunately, this is not working as I expect.

Here is the primary Observable (which works fine)

export class LocaleService {
  locale$: Observable<string>;

  constructor(private router: Router) {
    this.locale$ = router.events.pipe(
      filter(evnt => envt instanceof ActivationEnd),
      map((evnt: ActivationEnd) => evnt.snapshot.paramMap.get('locale')),
      shareReplay(1),
    );
  }
}

Here is the second Observable, which should update based on the value of the first Observable (seen above):

// This type is more complex, but made it simple for demonstrating the problem
interface ICountries { code: string; }

export class DataService {
  countries$: Observable<ICountries>;

  constructor(private http: HttpClient, private localService: LocaleService) {
    this.localeService.locale$.subscribe(locale => {
      // this is only here for debug purposes, showing it does change
      console.log('changed locale', locale);
    });

    this.countries$ = localeService.locale$.pipe(
      tap(locale => { console.log('http updated', locale); }), // called first time only, not on locale changes...
      switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`)),
      shareReplay(1),
    );
  }
}

and if it matters, the way I am consuming the countries$ is as follows (again, simplified to demonstrate the problem) :

@Component({
  template: `<ng-container *ngIf="countries$ | async as countries">{{ countries.code }}</ng-container>`,
})
export class CountryComponent implements OnInit {
  countries$: Observable<ICountries>;

  constructor(private dataService: DataService) {
    this.countries$ = dataService.countries$;
  }
}

I have been struggling with this for a few days now, looked for answers on StackOverflow, numerous other forums and asking friends who use Observables more frequently, but I cannot seem to solve this. Any help would be greatly appreciated!


Solution

  • I am not exactly sure but you could try to split the observables to two different parts. At the moment it is single stream like below

    this.countries = localeService.locale$.pipe(
      tap(locale => { console.log('http updated', locale); }), // called first time only, not on locale changes...
      switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`)),
      shareReplay(1),
      tap(locale => { console.log('http updated', locale); }), // called first time only, not on locale changes...
      switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`)),
      shareReplay(1)
    );
    

    Here is my suggestion. I've just split the observable stream to two different streams using ReplaySubject with buffer 1 (the same as the shareReplay from the question)

    LocaleService

    export class LocaleService {
      localeSource = new ReplaySubject<string>(1);
      locale$ = this.localeSource.asObservable();
    
      constructor(private router: Router) {
        router.events.pipe(
          filter(evnt => envt instanceof ActivationEnd),
          map((evnt: ActivationEnd) => evnt.snapshot.paramMap.get('locale'))
        ).subscribe(event => this.localeSource.next(event));
      }
    }
    

    DataService

    // This type is more complex, but made it simple for demonstrating the problem
    interface ICountries { code: string; }
    
    export class DataService {
      countriesSource = new ReplaySubject<ICountries>(1);
      countries$ = this.countriesSource.asObservable();
    
      constructor(private http: HttpClient, private localeService: LocaleService) {
        this.localeService.locale$.pipe(
          tap(locale => { console.log('http updated', locale); }),
          switchMap(locale => this.http.get<ICountries>(`http://localhost:8080/countries?lang=${locale}`))
        ).subscribe(countries => this.countriesSource.next(countries));
      }
    }
    

    CountryComponent

    @Component({
      template: `<ng-container *ngIf="(countries$ | async) as countries">{{ countries?.code }}</ng-container>`,
    })
    export class CountryComponent implements OnInit {
      countries$: Observable<ICountries>;
    
      constructor(private dataService: DataService) {
        this.countries$ = this.dataService.countries$;
      }
    }