Search code examples
rxjsrxjs5rxjs6rxjs-observables

RxJS group by field and return new observable


I've following interfaces and Observable<Machine[]>, what I want to achive is group by Machine symbol property in Observable<Machine[]> and return mapped observable Observable<Order[]>.

export interface Machine {
    symbol: string;
    price: number;
    amount: number;
    id: number;
}

export interface Order {
    symbol: string;
    machines: OrderMachine[];
}

export interface OrderMachine {
    price: number;
    amount: number;
    id: number;
}

I've tried to use RxJS gropBy operator but it seems it return grouped array one by one.

machines: Machine[] = [
        { amount: 1,  id: 1, symbol: "A", price: 1 },
        { amount: 1,  id: 2, symbol: "A", price: 2 }
    ];


of(machines).pipe(
        takeUntil(this.unsubscribe),
        mergeMap(res => res),
        groupBy(m => m.symbol),
        mergeMap(group => zip(of(group.key), group.pipe(toArray()))),
        map(x => { // here I have probably wrong model [string, Machine[]]
            const orderMachines = x[1].map(y => { return <OrderMachine>{price: y.price, amount: y.amount, id: y.id }})
            return <Order>{ symbol: x[0], machines: orderMachines }  })
        );

as in result I have Observable<Order> istead ofObservable<Order[]>.

expected result model:

orders: Order[] = [
        {   
            symbol: "A", 
            machines: [
                { amount: 1, price: 1, id: 1 },
                { amount: 1, price: 2, id: 2 }
            ]
        }
    ];

Solution

  • Here a possible solution based on your approach but with a few changes:

    const machines = [
      { amount: 1, id: 1, symbol: "A", price: 1 },
      { amount: 1, id: 2, symbol: "A", price: 2 },
      { amount: 1, id: 3, symbol: "B", price: 3 }
    ];
    
    from(machines) // (1)
      .pipe(
        // (2)
        groupBy((m) => m.symbol),
        mergeMap((group) => group.pipe(toArray())),
        map((arr) => ({
          symbol: arr[0].symbol, // every group has at least one element
          machines: arr.map(({ price, amount, id }) => ({
            price,
            amount,
            id
          }))
        })),
        toArray(), // (3)
      )
      .subscribe(console.log);
    

    (1) I changed of(machines) to from(machines) in order to emit the objects from machines one by one into the stream. Before that change the whole array was emitted at once and thus the stream was broken.

    (2) I removed takeUntil(this.unsubscribe) and mergeMap(res => res) from the pipe since there is no reason to have them in your example. takeUntil wouldn't have any effect since the stream is finite and synchronous. An identity function (res => res) applied with mergeMap would make sense in a stream of streams which is not the case in your example. Or do you actually need these operators for your project because you have an infinite stream of observables?

    (3) toArray() is what transforms Observable<Order> to Observable<Order[]>. It waits until the stream ends and emits all streamed values at once as an array.

    edit:

    The op has mentioned that he rather needs a solution that is compatible with an infinite stream but because toArray only works with finite streams the provided answer above would never emit anything in such scenario.

    To solve this I would avoid using groupBy from rxjs. It cvan be a very powerful tool in other cases where you need to split one stream into several groups of streams but in your case you simply want to group an array and there are easier methods for that that.

    this.store.pipe(
        select(fromOrder.getMachines)
        map((arr) =>
            // (*) group by symbol
            arr.reduce((acc, { symbol, price, amount, id }) => {
                acc[symbol] = {
                    symbol,
                    machines: (acc[symbol] ? acc[symbol].machines : [])
                        .concat({ price, amount, id })
                };
                return acc;
            }, {})
        ),
    )
    .subscribe((result) => 
        // (**)
        console.log(Object.values(result))
    );
    

    (*) you could use a vanilla groupBy implementation that returns an object of the shape {[symbol: string]: Order}.

    (**) result is an object here but you can convert it to an array easily but applying Object.values(result)