Search code examples
angulartypescriptrxjstwitchrxjs-observables

How do I use RxJS & Observables to do an http request only after multiple other events have fired?


I am having trouble combining the emitted values of two different Observables for use in an http request, and then returning the request's Observable for use by clients.

Some background: I am working on a Twitch extension. Part of their ecosystem is that extensions receive environment information through event callbacks. The ones I am interested in are located at window.Twitch.ext.onAuthorized() and window.Twitch.ext.configuration.onChanged() (if interested, see here for more details: https://dev.twitch.tv/docs/extensions/reference#helper-extensions).

When making calls to my backend, I need information from both of the above events. These will not change often (if ever), but I can't make calls until they are both available, and I want to get the most recently provided value when making calls. It looks like BehaviorSubjects would be ideal for this:

export class TwitchService {
  private authSubject: BehaviorSubject<TwitchAuth> = new BehaviorSubject<TwitchAuth>(null);
  private configSubject: BehaviorSubject<TwitchConfig> = new BehaviorSubject<TwitchConfig>(null);
  private helper: any;

  constructor(private window: Window) {
    this.helper = this.window['Twitch'].ext;
    this.helper.onAuthorized((auth: TwitchAuth) => {
      this.authSubject.next(auth);
    });
    this.helper.configuration.onChanged(() => {
      this.configSubject.next(JSON.parse(this.helper.configuration.global.content));
    });
  }

  onAuthorized(): Observable<TwitchAuth> {
    return this.authSubject.asObservable().pipe(filter((auth) => !!auth));
  }

  onConfig(): Observable<TwitchConfig> {
    return this.configSubject.asObservable().pipe(filter((config) => !!config));
  }
}

This model works well for parts of the app that subscribe to one of those two Observables. My problem is I cannot find a working way to combine them and use the latest emitted values from both to create a single-use Observable for http requests.

Here's what I have so far:

type twitchStateToObservable<T> = (auth: TwitchAuth, config: TwitchConfig) => Observable<T>;

export class BackendService {
  constructor(private http: HttpClient, private twitch: TwitchService) {}

  private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
    let subject = new Subject<T>();
    combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
      first(), // because we only want to make the http request one time
      map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
    );
    return subject.asObservable();
  }

  exampleHttpRequest(): Observable<Response> {
    return this.httpWithTwitchState((auth, config) =>
      // this.url() and this.headers() are private functions of this service
      this.http.get<Response>(this.url(config) + 'exampleHttpRequest', { headers: this.headers(auth, config)})
    );
  }
}

And then, a client of this service should be able to make http requests with this simple call, without needing to know anything about the events or care when they fire:

this.backend.exampleHttpRequest().subscribe((resp) => {
  // do stuff with resp here...but this is never hit
}

Based on my understanding, combineLatest() should emit new values whenever either of the input Observables emits a new value. However, the call f(auth, config) inside of map() is never triggered in my application. I've tested it with breakpoints, with console.logs, and by keeping an eye on the Network tab in the browser debugger tools. It's possible the call to first() is throwing it off, but I don't want to repeat the http request if the events fire again, for obvious reasons.

Thanks in advance for any advice or pointers!


Solution

  • Ok, it turns out I was missing something obvious. I was not subscribing to the combineLatest() call, so it was never being executed. After adding a subscribe() at the end, this worked fine:

    private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
      let subject = new Subject<T>();
      combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
          first(),
          map(([auth, config]) => f(auth, config).subscribe((resp) => subject.next(resp)))
      ).subscribe(); // adding this fixed the problem
      return subject.asObservable();
    } 
    

    After doing that, I did a little more research about how to un-nest subscriptions in cases like this, which led me to the flatMap() operator. So now the httpWithTwitchState() function looks like this:

    private httpWithTwitchState<T>(f: twitchStateToObservable<T>): Observable<T> {
      return combineLatest(this.twitch.onAuthorized(), this.twitch.onConfig()).pipe(
        first(),
        map(([auth, config]) => f(auth, config)),
        flatMap((resp) => resp)
      );
    }
    

    This works as expected now and is cleaner. Thanks to those who took the time to read my question and/or reply.