Search code examples
websocketrxjsreactivexcombinelatestsubject-observer

rx.js make combineLatest with inner WebSocket Subjects as Observable


Below My code works well.

But, I think combineLatest in other observable ($fromArray)'s operator (map) is weird.

How can I make combineLatest as observable itself?

const item = ["television", "pc", "radio", "camera"];
const fooMarket = webSocket("wss://api.fooMarket.com/websocket/");
const barMarket = webSocket("wss://api.barMarket.com/websocket/");

// market WS unit Stream is like below (emit real-time price change for each Item, but Arbitrary Item Emission)
// {
//   itemName : "televison",
//   price: 980
// }

const fromArray$ = of(...item).pipe(
  map((x) => {
    const fooItem = fooMarket.pipe(
      filter((y) => y.itemName === x),
      map((z) => ({ itemName, price: item.price }))
    );
    const barItem = barMarket.pipe(
      filter((y) => y.itemName === x),
      map((z) => ({ itemName, price: item.price }))
    );

    combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem }).subscribe(
      console.log
    );
    // return combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem }).subscribe; // this way makes fromArray$'s type as 'Subscription' (not 'Observable' type)
  })
);

fromArray$.subscribe();

// result is like below
// {
//   "foo-television": { itemName : "television", price: 980 },
//   "bar-television : { itemName : "television", price: 950 }
// }
// {
//   "foo-pc": { itemName : "pc", price: 110 },
//   "bar-pc : { itemName : "pc", price: 120 }
// }
// ...continuing as Real-time Price Change

Solution

  • Your question is not completely clear to me but I think you want to get rid of nested subscriptions. (Correct me if I'm wrong please)

    For that you just need to replace the map operator with mergeMap and return the combineLatest observable from the projection function.

    Like this:

    const fromArray$ = of(...item).pipe(
      mergeMap((x) => {
        ...
    
        return combineLatest({ [`foo-${x}`]: fooItem, [`var-${x}`]: barItem })
      })
    );
    
    fromArray$.subscribe();
    

    EDIT: Alternative to get array of combineLastest as requested in the comments

    As an alternative, if you want to get an array of combineLatest Observables and have the possibility to handle them individually or as a whole, you could extract the mergeMap logic to a function and then map the array of items to an array of combineLastest.

    function livePricingFor(itemName: string) {
      const fooItem = fooMarket.pipe(
        filter((item) => item.itemName === itemName),
        map((item) => ({ itemName, price: item.price }))
      );
      const barItem = barMarket.pipe(
        filter((item) => item.itemName === itemName),
        map((item) => ({ itemName, price: item.price }))
      );
    
      return combineLatest({
        [`foo-${itemName}`]: fooItem,
        [`bar-${itemName}`]: barItem,
      });
    }
    
    ... 
    
    const items = ["television", "pc", "radio", "camera"];
    const itemsLivePricingObservables = items.map(livePricingFor); // Array of combineLastest
    

    Done this, to subscribe to them all, you could use the merge operator

    merge(...itemsLivePricingObservables).subscribe()
    

    Or if you just want to subscribe to one of them you access to it via the array index

    itemsLivePricingObservables[0].subscribe();
    

    cheers