Search code examples
rxjsrxjs-observables

Fanning out and summing results in RxJS


The below is a mockup of something I'm trying to achieve in RxJS (summing resource counts by compliance status). It works, but is there a simpler way to write this?

const { combineLatest, of, map, switchMap } = require('rxjs');

let rules$ = of(['one', 'two', 'three']); // in the real world, this comes from an HTTP call

let complianceTypes = [
  'COMPLIANT',
  'NON_COMPLIANT',
  'NOT_APPLICABLE',
  'INSUFFICIENT_DATA',
];

const getCountsForRule$ = () => of(1); // in the real world, this comes from an HTTP call

rules$
  .pipe(
    switchMap((rules) =>
      combineLatest(
        complianceTypes.map((complianceType) =>
          combineLatest(rules.map((rule) => getCountsForRule$(rule))).pipe(
            map((counts) => ({
              complianceType,
              count: counts.reduce((total, count) => count + total, 0),
            }))
          )
        )
      ).pipe(
        map((counts) =>
          counts.reduce(
            (acc, count) => ({
              ...acc,
              [count.complianceType]: count.count,
            }),
            {}
          )
        )
      )
    )
  )
  .subscribe((res) => console.log('Got result', { res }))

Solution

  • If I understand the problem right I would proceed like this.

    First of all I would build a function that, given a rule, returns the counts for all compliance types in one shot, e.g.

    const countForRule$ = (rule) => {
      // create an array of Observables that "execute" the query for each compliance type
      const counts$ = complianceTypes.map((complianceType) => {
        return getCountsForRule$(rule, complianceType).pipe(
          map((count) => ({ count, complianceType }))
        );
      });
      // Execute the queries in parallel using forkJoin
      return forkJoin(counts$);
    };
    

    Once we have this basic building block, we can create the complete stream like this

    rules$
      .pipe(
        // rules is an array of rules and, with the from function
        // we turn it into a stream of rules
        concatMap(rules => from(rules)),
        // as soon as a rule is notified by upstream, we "execute" the query
        // for the counts of all compliance types
        concatMap((rule) => {
          return countForRule$(rule);
        }),
        // eventually we reduce all the values notified by upstream into a 
        // single result
        reduce((acc, val) => {
          console.log(val)
          val.forEach(countForType => {
            const count = countForType.count
            const type = countForType.complianceType
            acc[type] = acc[type] ? acc[type] + count : count
          })
          return acc;
        }, {})
      )
      .subscribe((res) => console.log('Got result', res));
    

    This stackblitz shows it.

    I am not sure this is simpler than your solution. I think though it is more in line with a reactive style. Up to you to decide whether this considerations are valid or not.