Search code examples
rxjsobservablereactive-programming

Combining two observable sources filtering by one first observable property


Having an observable emitting a list of users with the next content:

[
    {
        "id": 1,
        "name": "John",
        "status": "Active"
    },
    {
        "id": 2,
        "name": "Mary",
        "status": "Inactive"
    },
    {
        "id": 3,
        "name": "Peter",
        "status": "Inactive"
    },
    {
        "id": 4,
        "name": "Susan",
        "status": "Active"
    }
]

And I have another observable returning the extended user data:

{
    "id": 1,
    "authorizations: 20
}

I use the detail of each user in an specific details page, but I would like to combine part of the detail in the users list and obtain the next result and only filter by the status Active:

[
    {
        "id": 1,
        "name": "John",
        "status": "Active",
        "authorizations": 20
    },
    {
        "id": 4,
        "name": "Susan",
        "status": "Active",
        "authorizations": 10
    }
]

It is possible to use some filtering operator and combine those results without use two subscriptions?

Tried the following code but, would be a better or simplified way to do it?

import { of, Observable, combineLatest } from 'rxjs';
import { filter, map, mergeAll, mergeMap } from 'rxjs/operators';

type State = 'Active' | 'Inactive';

type User = { id: number; name: string; status: State };
type UserDetail = { id: number; authorizations: number };
type UserWithAuthorizations = User & UserDetail

const users: User[] = [
    {
        "id": 1,
        "name": "John",
        "status": "Active"
    },
    {
        "id": 2,
        "name": "Mary",
        "status": "Inactive"
    },
    {
        "id": 3,
        "name": "Peter",
        "status": "Inactive"
    },
    {
        "id": 4,
        "name": "Susan",
        "status": "Active"
    }
]

const authorizations: UserDetail[] = [
  { id: 1, authorizations: 20 },
  { id: 2, authorizations: 5 },
  { id: 3, authorizations: 30 },
  { id: 4, authorizations: 10 },
];

const getAuthorizationsByUser= (userId: number): Observable<Partial<UserWithAuthorizations>>  => {
  const users$ = of(users)
  const authorizations$ = of(authorizations)
  return combineLatest([users$, authorizations$]).pipe(
    map(res => {
    const user = res[0].find(u => u.id === userId)
    const { authorizations } = res[1].find(a => a.id === userId)
    return {
      ...user,
      authorizations
    }
  }))
};

const fetchUsersWithAuthorizations = () => of(users);


fetchUsersWithAuthorizations()
  .pipe(
    mergeAll<User>(),
    filter((user) => user.status === "Active"),
    mergeMap((user) => getAuthorizationsByUser(user.id))
  )
  .subscribe(console.log);

Solution

  • I think what you're after is the following:

    // utility to transform an array into a dictionary based on some key
    // I'll explain why in a second
    const toDictionary = <
      Obj extends Record<string | number | symbol, any>,
      Key extends keyof Obj
    >(
      arr: Obj[],
      key: Key
    ): Record<Obj[Key], Obj> =>
      arr.reduce((acc, curr) => {
        acc[curr[key]] = curr;
        return acc;
      }, {} as Record<Obj[Key], Obj>);
    
    const fetchUsers = () => of(users);
    const fetchAuthorizations = () => of(authorizations);
    
    combineLatest([
      fetchUsers(),
      fetchAuthorizations().pipe(
        // here, we transform the authorizations array into a dictionary
        // with the ID as key, so that later on we can access an authorization
        // based on an ID. This will save us to iterate on the authorization array
        // for each of the users which is more efficient. Note that we also do it
        // here instead of doing it later so that if the `fetchUsers()` was to be replaced by an observable
        // that can emit multiple times, we'd not make that transformation if it emits
        // we only do it when necessary: Only when the authorizations observable changes
        map((authorizations) => toDictionary(authorizations, 'id'))
      ),
    ])
      .pipe(
        map(([users, authorizations]): UserWithAuthorizations[] =>
          users
            .map((user) => ({
              ...user,
              authorizations: authorizations[user.id]?.authorizations,
            }))
            .filter((user) => user.status === 'Active')
        )
      )
      .subscribe(console.log);
    

    See a live demo here.

    EDIT: I saw your comment on the post only after answering the above.

    Based on the fact that you can only fetch authorizations individually, I've got 2 options for you:

    1. You want only 1 emission, including all the users with their permissions
    const usersWithAUthorizations$: Observable<UserWithAuthorizations[]> =
      fetchUsers().pipe(
        mergeMap((users) =>
          forkJoin(
            users
              .filter((user) => user.status === 'Active')
              .map((user) =>
                fetchAuthorizations(user.id).pipe(
                  map((authorizations) => ({
                    ...user,
                    ...authorizations,
                  }))
                )
              )
          )
        )
      );
    

    Live demo

    1. You want to get all the intermediate emission, meaning that each time we get the authorization for a given user, we'd emit a temporary result with all the previous responses + the current, etc until we treated all of them
    const usersWithAUthorizations$: Observable<UserWithAuthorizations[]> =
      fetchUsers().pipe(
        mergeMap((users) =>
          users
            .filter((user) => user.status === 'Active')
            .map((user) =>
              fetchAuthorizations(user.id).pipe(
                map((authorizations) => ({
                  ...user,
                  ...authorizations,
                }))
              )
            )
        ),
        mergeAll(),
        scan((acc, curr) => [...acc, curr], [] as UserWithAuthorizations[])
      );
    

    Live demo