Search code examples
angulartypescriptrxjsrxjs-observablesrxjs-pipeable-operators

Share a subscription if observable not done


I have an angular application that should sync some data with the server on some conditions (some triggers in software or when user request). So i have a function like this:

    ...

    public createSyncObservable(): Observable<any> {
        return this.retriveDataFromStorage().pipe(
            switchMap(
                (data) => forkJoin(this.api.sendData1(data.data1),this.api.sendData2(data.data2),this.api.sendData3(data.data3))
            ),
            switchMap(
                (data) => this.api.getDataFromServer()
            ),
            switchMap(
                (data) => this.updateLocal(data)
            )
        )
    }

The behaviour I want is:

  • If user (or some trigger) request the sync and it is already happening, I should not do it again, just wait for the current sync to end and return the same observable (shared).
  • If last sync already finished, it should start again (create a new observable).

My best solution for now is to do something like this (untested code):

    ...
    public syncData(): Observable<any> {
        if (this.observable_complete) {
            this.observable_complete = false;
            this.syncObservable$ = this.createSyncObservable().pipe(share())
            this.syncObservable$.subscribe(
                (data) => {this.observable_complete = true}
            ) 
        }
        return this.syncObservable$;
    }

Is this the way to go? Maybe I am missing some RxJS operator that would help me in this case? This solution just seems a bit hacky...


Solution

  • If calling this.createSyncObservable() doesn't do any actual work but only subscribing to the observable it returns does you only need to call the function once. You could then simply do:

    public syncData$ = this.createSyncObservable().pipe(share());
    

    share will unsubscribe from its source if there are no subscribers left (i.e. when this.createSyncObservable() completed). So a subscriber to this.syncData$ will trigger a subscribe to the observable returned from this.createSyncObservable() if it completed.

    // The first subscribe will trigger a subscribe to createSyncObservable()
    syncData$.subscribe()
    // A second subscribe while the first hasn't completed won't trigger a subscribe
    // to createSyncObservable() but instead just wait for its response
    syncData$.subscribe()
    
    // After some time ...
    // Another subscribe after createSyncObservable() completed will trigger another 
    // subscribe to createSyncObservable()
    syncData$.subscribe()
    

    https://stackblitz.com/edit/rxjs-44qzaj?file=index.ts