Search code examples
angularsubscriberxjs-observables

Angular 10 call finalize() when nested observables complete


finalize nested observables after the last item is processed. Stackblitz

finalize() should be called when the observables complete - all the numbers (11, 12, 13) are output.

How should the observables be nested?

import { from } from "rxjs";
import { map, finalize } from "rxjs/operators";

//emit (1,2,3)
const data = [
  {
    userId: 1,
    id: 1
  },
  {
    userId: 1,
    id: 2
  },
  {
    userId: 1,
    id: 3
  }
];
const source = from(data);
//add 10 to each value
const example = source.pipe(map(val => add(val.id)));
//output: 11,12,13
const subscribe = example.subscribe(val => console.log(val));
function add(n) {
  return n + 10;
}

The Stackblitz was a starting point.

In the following code, I want to send a message to all the users.

I want to unsubscribe and close the modal when all the messages are sent.

  submit() {
    const users = from(this.form.value.users).pipe(
      finalize(() => {
        console.log('finalize'),
        this.dismissModal()
      }),
      map(user => this.buildMessage(user))
    ).subscribe(message => {
      this.subscription = this.firestoreService.addMessage(message)
        .subscribe();
    });
  }

  ngOnDestroy(): void {
    this.subscription?.unsubscribe();
  }

  dismissModal() {
    this.dialogRef.close('modal dismissed');
  }

Now firestoreService.addMessage(message) logs the console.log('addMessage') and returns but does not add the message to the database.

  addMessage(data:Message):Observable<T> {
    console.log('addMessage');
    data.timestamp = firebase.firestore.FieldValue.serverTimestamp();
    return this.authService.currentUser$.pipe(
      switchMap(user => {
        console.log('addMessage user: ', user);
        if (user) {
          return this.firestore
            .collection<any>(user.company)
            .doc(user.licence)
            .collection<any>(message)
            .add(data)
              .then(res => {
                console.log("Message successfully added! ", res);
              })
              .catch(e => {
                console.error("Error adding message: ", e);
              });
          } else {
            return [];
          }
        }
      )
    );
  }

Solution

  • You should use higher order operators in this case switchMap

    With switchMap you subscribe buildMessage(user) and pass it observable result to firestoreService.addMessage(message) in only one pipe operations.

    submit() {
      this.subscription = from(this.form.value.users).pipe(
        finalize(() => {
          console.log('finalize'),
          this.dismissModal()
        }),
        map(user => this.buildMessage(user)),
        switchMap((message) => this.firestoreService.addMessage(message))
      ).subscribe();
    }
    
    ngOnDestroy(): void {
      this.subscription?.unsubscribe();
    }
    
    dismissModal() {
      this.dialogRef.close('modal dismissed');
    }
    

    Edited

    You need to return Observables in this way. from create observable from promise of create observable from values in sequence

     addMessage(data:Message):Observable<T> {
        console.log('addMessage');
        data.timestamp = firebase.firestore.FieldValue.serverTimestamp();
        return this.authService.currentUser$.pipe(
          switchMap(user => {
            console.log('addMessage user: ', user);
            if (user) {
              return from(this.firestore
                .collection<any>(user.company)
                .doc(user.licence)
                .collection<any>(message)
                .add(data))
              } else {
                return of([]);
              }
            }
          )
        );
      }