Search code examples
rxjsreactive-programming

How to pipe and tune bulk operation using multiple async tasks using RxJs?


For Example

  • There are 100 products with 1-5 quantity in cart and user clicks place order
  • While creating order, the following tasks takes place:
    1. Takes product 1 by 1 to check product availability
    2. Checks and apply discount on products taking 10 products at a time
    3. Apply applicable taxes on products taking 50 products at a time
    4. Calculate order total once computation for all products is finished
    5. Creates an order

Note: There is no limit to how many items can be added to cart and similar kind of tasks are being done when a product is added to cart.


Solution

  • I'm not entirely sure what you are trying to accomplish, but my guess the following is what you're after:

    Assuming the following are you models:

    class Product {
      id: number;
      name: string;
    }
    
    class ProductOrder {
      product: Product;
      quantity: number;
      discount: number;
      taxes: number;
    }
    
    class Order {
      productOrder: ProductOrder[];
      total: number;
    }
    

    Consider writing a function for each step, where each function's input will be the previous output:

    /**
     * Check product availability, 
     * if success return the same @productOrder 
     * else throw error
     */
    declare function checkProductAvailabilty(productOrder: ProductOrder): Observable<ProductOrder>;
    
    /**
     * Checks and apply discount on products, 
     * apply changes to @productOrder 
     * if success return the same @productOrder 
     * else throw error
     */
    declare function checkAndApplyDiscount(productOrder: ProductOrder): Observable<ProductOrder>;
    
    /**
     * Apply applicable taxes on products, 
     * apply changes to @productOrder 
     * if success return the same @productOrder 
     * else throw error
     */
    declare function applyApplicableTaxes(productOrder: ProductOrder): Observable<ProductOrder>;
    
    /**
     * Calculate order total, 
     * if success return @order
     * else throw error
     */
    declare function calculatOrderTotal(productOrder: ProductOrder[]): Observable<Order>;
    
    /**
     * Create order, 
     * if success return
     * else throw error
     */
    declare function createOrder(order: Order): Observable<void>;
    

    Having the functions above, what is left to do is wiring them all up:

    import {
      concatMap,
      mergeMap,
      mergeAll,
      toArray,
      bufferCount,
      catchError,
    } from 'rxjs/operators';
    
    
    let productOrders: ProductOrder[];
    
    from(productOrders)
      .pipe(
        concatMap(checkProductAvailabilty),
        bufferCount(10),
        mergeAll(),
        concatMap(checkAndApplyDiscount),
        bufferCount(50),
        mergeAll(),
        concatMap(applyApplicableTaxes),
        toArray(),
        mergeMap(calculatOrderTotal),
        mergeMap(createOrder),
        catchError(errorHandler)
      );