Search code examples
angularrxjsobservablereactivesubject

How to send to a Subject different Observable results without having to break the observable flow subscribing?


Edited the question with a concise example. I think it'll make it easier for people to understand. Take note this example is super simplified, normally I layer things between different components, but for the question it will suffice.

Take this component. It takes the name of a fetched object, and a button to fetch the next object. To get the value of the next REST request, I know of no other way than to subscribe to the answer, and what I would like is to have something like "combineLatest" but for "the future", so I can combineLatest of later streams.

import { Component, VERSION, OnInit } from '@angular/core';
import { BehaviorSubject, Observable } from 'rxjs';
import { HttpClient } from '@angular/common/http';

@Component({
  selector: 'my-app',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.css']
})
export class AppComponent implements OnInit {
  
    private readonly PEOPLE_API_ENDPOINT = `https://swapi.dev/api/people/`;

   private characterSubject : BehaviorSubject<any> = new BehaviorSubject<any>({name: 'loading'});
  private currentCharacter: number = 1;
  
  character$ : Observable<any> = this.characterSubject.asObservable();
  
  constructor(
    private http: HttpClient
  ) {}
    
  ngOnInit() {
    this.updateCurrentCharacter();
  }

  nextCharacter() : void {
    this.currentCharacter ++;
    this.updateCurrentCharacter();
  }

  //I would want to avoid subscribing, instead
  //I would like some sort of operation to send the stream
  //emissions to the subject. As to not break the observable
  //chain up until presentation, like the best practices say.
  private updateCurrentCharacter() : void {
    this.fetchCharacter(this.currentCharacter)
      .subscribe( 
        character => this.characterSubject.next(character)
      );
  }

  private fetchCharacter (id: number) : Observable<any> {
        return this.http.get(this.PEOPLE_API_ENDPOINT + `${id}/`);
  }
}
<span>{{ character$ | async }} </span>

<button (click)="nextCharacter()">Next character</button>

Online demo

Is there any way to do that? Doing something like "emitIn(characterSubject)". I think there is nothing like this, like dynamically add source emissions to a source.


Solution

  • If I understand right, you have a service which has some method that triggers an http call, like dataRepository.fetch(id), and you have different components that need to react when the response of the call arrives.

    If this is the case, there are many ways to deal with such requirement, one being to use Subjects exposed as public properties of the service, which is what I understand you want to do.

    To achieve this behavior the code you have written is what you need, in other words this is OK

    dataRepository.fetch(id)
      .subscribe(
        newData => currentData.next(newData)
      )
    

    If you want to make it more complete and manage also the error and complete case, you may write it like this

    dataRepository.fetch(id)
      .subscribe(currentData)
    

    In this last form, you are passing currentData as the Observer parameter of subscribe. currentDatathough is also an Observable and therefore can next, error and complete.

    If you use ReplaySubject you can add the possibility to store the last results and present them to components which are created after the notification of the result.