Search code examples
angulartypescriptrxjsangular2-observables

Angular RxJS, push newly added item to the existing Observable Array


I have 2 services. RoomService -> Returns a list of existing rooms NotificationService -> Returns a Server Sent Event (SSE) when a new room is added.

What I'm trying to do is

  • Fetch all the existing rooms
  • Await the SSE for room creation
  • When SSE is received, add this new room to the list of existing rooms.

Here is what I'm trying to do

export class RoomListComponent {
  rooms$: Observable<Room[]> = this.roomService
    .getRooms()
    .pipe(
      mergeMap((rooms) =>
        this.notificationService
          .getServerSentEvent<Room>()
          .pipe(concatMap((room) => [...rooms, room]))
      )
    );

  constructor(
    private notificationService: NotificationService,
    private roomService: RoomService
  ) {}
}

But this results in error Type 'Observable<Room>' is not assignable to type 'Observable<Room[]>'. Type 'Room' is missing the following properties from type 'Room[]': length, pop, push, concat, and 29 more.ts(2322)

What am I doing wrong?


Solution

  • Using scan to accumulate all new rooms emitted from your notificationService

    const initialRooms$ = roomService.getRooms();
    
    const allNewRooms$ = notificationService.getServerSentEvent().pipe(
      scan((acc, curr) => [...acc, curr], []),
      startWith([])
    );
    
    // all rooms
    combineLatest([initialRooms$, allNewRooms$])
      .pipe(map(([initialRooms, allNewRooms]) => [...initialRooms, ...allNewRooms]))
      .subscribe((arr) => console.log(JSON.stringify(arr)));
    

    Stackblitz