I want to create an observable from a tree. Each node will generate an observable which depends on the observable of its parent(e.g. via switchMap
). At each level of the tree, the observables for each children needs to be merged.
I want to convert this code so that it uses observables:
interface Request {
name: string;
childrens?: Request[];
}
start() {
this.nextRequests(this.requests);
}
private nextRequests(requests: Requests[]) {
requests.forEach(request => {
this.socketService.message(request.name, (res) => {
if (request.childrens) {
this.nextRequests(request.childrens, res);
}
});
});
}
So this is what I thought it would look like:
interface Request {
name: string;
childrens?: Request[];
}
// This isn't right.
// SocketService.message return a observable, the reponse of the request.
start(requests: Request[]): Observable<any> {
return from(requests).pipe(switchMap(request => {
return this.socketService.message(request.name);
}), concatMap(request => {
if (request.childrens) {
return this.start(request.childrens);
}
return of(request.name);
}));
}
const mock: Request[] = [
{name: 'test1', childrens: [
{ name: 'test4' },
{ name: 'test5' }
]},
{ name: 'test2' },
{ name: 'test3' }
];
this.start(mock).subscribe((name) => {
console.log(`Request ${name} done`);
}, err => { }, () => {
console.log('Complete');
});
And the output will should look like this:
Request test1 done
Request test2 done
Request test3 done
Request test4 done
Request test5 done
Complete
But instead I get this:
Request test2 done
Request test3 done
Request test4 done
Request test5 done
The order of emission between sibling nodes is not important. But parents with children are not printed(test1
), and the complete callback is never executed. Why is that?
Thanks!
the main problem here is that you are not adding the test1
element as param to
if (request.childrens) {
return this.start(request.childrens);
}
It should look like
if (request.childrens) {
return this.start([{name:request.name},...request.childrens]);
}
By doing so you will recieve the test1
in the subscription as first value, otherwise it will never be logged, because it will be kind of filtred by the stream.
Now there is also BFS-ish demo in the link below
The code looks like this
function startBFS(requests) {
return merge(from(requests)).pipe(
mergeMap((x, i) => {
if (x.children) {
return concat(of(x), startBFS(x.children));
} else {
return of(x);
}
})
);
}
Here is a live demo codeSandbox