Search code examples
angularobservablesubscribefork-join

Randomly subscribes to stream when using mergeMap and forkJoin


The subscribe is running randomly when userId changes. Sometimes it does, sometimes i doesn't. userId variable is coming from a dropdown selector (Choices library).

Shouldn't subscribe always run even though userId is the only stream that changes?

Is there anything obvious that I'm am breaking or doing wrong?

this.singleUserControl.valueChanges
    .mergeMap((userId: number) => {
        this.router.navigate([ '/r', 'company-admin', 'user-claims', userId.toString() ]);
        this.userRoles = [];
        this.claims = [];
        const userRolesObserv: Observable<Array<RoleWithClaims>> =
            this.claimsService.getUserRoles(userId)
                .mergeMap((roles: Array<Role>) => {
                    const roleObservables = [];

                    for (const role of roles) {
                        roleObservables.push(this.claimsService.getClaimsForRole(role.id)
                            .mapTo(claims => ({
                                role: role,
                                claims: claims,
                            } as RoleWithClaims)));
                    }

                    return forkJoin(roleObservables);
                });

        return forkJoin([
            Observable.of(userId),
            userRolesObserv,
            this.claimsService.getAllClaimsForUser(userId),
        ]);
    })
    .subscribe((data: [ number, Array<RoleWithClaims>, Array<Claim>]) => {
        this.userSelectedId = data[0];
        this.userRoles = data[1]
        this.claims = data[2];
    }, (error) => {
        console.log(error);
    });

Solution

  • try doing this:

    return (roleObservables.length) ? forkJoin(roleObservables) : Observable.of([]);
    

    forkJoin won't emit with an empty array, so in this code, the subscribe wouldn't execute for any user id that didn't have roles associated.

    In all my rx projects I have a utility function:

    emptyJoin(obs: Observable<any>[]): Observable<any[]> => (obs.length) ? forkJoin(obs) : Observable.of([]);