Search code examples
javascripttypescriptrxjsreactivex

Calling method on array entity and children in ReactiveX


I want to create an observable from a tree. Each node will generate an observable which depends on the observable of its parent(e.g. via switchMap). At each level of the tree, the observables for each children needs to be merged.

I want to convert this code so that it uses observables:

interface Request {
    name: string;
    childrens?: Request[];
}

start() {
  this.nextRequests(this.requests);
}

private nextRequests(requests: Requests[]) {
  requests.forEach(request => {
    this.socketService.message(request.name, (res) => {
      if (request.childrens) {
        this.nextRequests(request.childrens, res);
      }
    });
  });
}

So this is what I thought it would look like:

interface Request {
    name: string;
    childrens?: Request[];
}

// This isn't right. 
// SocketService.message return a observable, the reponse of the request.
start(requests: Request[]): Observable<any> {
  return from(requests).pipe(switchMap(request => {
      return this.socketService.message(request.name);
    }), concatMap(request => {
    if (request.childrens) {
      return this.start(request.childrens);
    }
    return of(request.name);
  }));
}

const mock: Request[] = [
    {name: 'test1', childrens: [
        { name: 'test4' },
        { name: 'test5' }
    ]},
    { name: 'test2' },
    { name: 'test3' }
];

this.start(mock).subscribe((name) => {
    console.log(`Request ${name} done`);
}, err => { }, () => {
    console.log('Complete');
});


And the output will should look like this:

Request test1 done
Request test2 done
Request test3 done
Request test4 done
Request test5 done

Complete

But instead I get this:

Request test2 done
Request test3 done
Request test4 done
Request test5 done

The order of emission between sibling nodes is not important. But parents with children are not printed(test1), and the complete callback is never executed. Why is that?

Thanks!


Solution

  • the main problem here is that you are not adding the test1 element as param to

    if (request.childrens) {
      return this.start(request.childrens);
    }
    

    It should look like

    if (request.childrens) {
      return this.start([{name:request.name},...request.childrens]);
    }
    

    By doing so you will recieve the test1 in the subscription as first value, otherwise it will never be logged, because it will be kind of filtred by the stream.

    EDIT:

    Now there is also BFS-ish demo in the link below

    The code looks like this

    function startBFS(requests) {
      return merge(from(requests)).pipe(
        mergeMap((x, i) => {
          if (x.children) {
            return concat(of(x), startBFS(x.children));
          } else {
            return of(x);
          }
        })
      );
    }

    Here is a live demo codeSandbox