Search code examples
angularrxjsrxjs6rxjs-observablesrxjs-pipeable-operators

how to use combineLatest with a Subscriber


I want to use combineLatest with a subscriber and only subscribe through this method if only the id passed matches the itemIds.

Here is what i've done so far:

export interface Instance {
  onEvent<T = any>(eventName: string): Subscriber<T>;
}

export interface Payload {
  id: string;
  quantity: number;
}

public async getSingletonSubscription(id: string): Promise<Observable<any>>{
  const instance = await this.instance;
  // itemids is an Observable coming from another method. Just did an array for this example
  const itemIds: Array<number> = [1, 2];

  // Subscriber<any>
  const subscriber = instance.onEvent(message);

  const observable = combineLatest(
    subscriber,
    itemIds
  ).pipe(
    filter((payload: any, ids: any) => ids.include(id))
  );

  return observable
}

I'm currently getting a type error of Argument of type 'Subscriber<any>' is not assignable to parameter of type 'ObservableInput<any> | SchedulerLike | ((...values: any[]) => unknown)' and i'm currently not sure how I can get through this type error.

Normally I would use interface.onEvent this way

this.interface
  .onEvent(message)
  .subscribe((payload: Payload)) => console.log(payload));

but since itemIds would be an observable as well. I wanted to use combineLatest to subscribe once when I return an observable


Solution

  • There's quite a few misrepresentations in your code example, but I will try to explain each one.

    Based on the following:

    this.interface
      .onEvent(message)
      .subscribe((payload: Payload)) => console.log(payload));
    

    The return type of this.interface.onEvent() is an Observable, not a Subscriber. A Subscriber is something that has an active Subscription to an Observable.

    About your method, I would avoid mixing Promises and Observables. If you're working with RxJS, it is easier to make everything an Observable. Since this.interface.onEvent(message) returns an Observable, we can leave that as is.

    Now for your updated code:

    export interface Instance {
      onEvent<T>(eventName: string): Observable<T>;
    }
    
    export interface Payload {
      id: string;
      quantity: number;
    }
    
    public getSingletonObservable(id: string): Observable<Payload>{
      const instance$ = this.instance.onEvent<Payload>(message);
      // To make this an observable, we need to wrap it inside the `of()` operator.
      const itemIds$ = of([1, 2]);
    
      const observable$ = combineLatest([
        subscriber,
        itemIds
      ]).pipe(
        filter(([payload, ids]) => ids.include(payload.id)),
        // I'm assuming we want to return the payload if it passes the previous condition.
        map(([payload, ids]) => payload)
      );
    
      return observable$;
    }
    

    I updated the typing so we're not using any (which should be avoided at all cost in TypeScript).

    I also updated some variable names, as it's common to add $ to the end of a name if it's an observable.

    You also forgot to declare where id is defined, so I added the reference that its a property of payload.