Search code examples
angulartypescriptrxjs5

Pausing and resuming an observable stream, please suggest better options


I have a requirement to listen to a stream of items from an observable. When certain conditions arise an asynchronous task will be performed on the item and the component will be 'busy' until this completes. I would like to pause handling items in the subscription until this task has completed (as the processing of the following items is dependent on the result) and then resume from the next item in the sequence without any loss.

the next part is probably best read whilst looking at the Plunk here

To achieve this I have used a buffer with a swtichMap. I thought these would do the job on their own but switchMap destroys and recreates the subscription the sequence gets reset every time.

export class AppComponent implements OnInit {
    source$: Observable<any>;
    clearBuffer$ = new Subject();
    busy$ = new Subject();

    private itemSubscription: Subscription;
    private stayAliveSubscription: Subscription;

    items: any[] = [];

    constructor() { }

    ngOnInit() {
      this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      ).share();

      this.busy$
        .subscribe(result => {
          if (!result) {
            this.clearBuffer$.next();
          }
        }, error => {
          console.log(error);
        });
    }

    start() {
      if (!this.itemSubscription) {
        this.itemSubscription =
          this.busy$.switchMap(busy => {
            if (busy) {
              return this.source$.buffer(this.clearBuffer$);
            } else {
              return this.source$;
            }
          })
            .subscribe(items => {
              if (Array.isArray(items)) {
                this.items.push('buffered: ' + items.join());
              } else {
                this.items.push('live feed: ' + items);
              }
            }, error => {
              this.items.push(error);
            });

        this.stayAliveSubscription = this.source$
          .subscribe(result => {
            console.log(result);
          }, error => {
            console.log(error);
          });

        this.busy$.next(false);
      }
   }
...
}

To fix this the source$ observable is now shared and a separate subscription is started (stayAliveSubscription) so a single subscription is used throughout. This seems messy to me and I wanted to ask if anyone can show me better/alternative approaches to the problem.

I put the working sample in a Plunk here click start to start the subscription and then set/unset the busy toggle to buffer and continue.

edit: working code with concatMap

I changed the Plunk to use concatMap. I've pasted the code below as well. The key is that the busy observable returned in concatMap must complete you can't just return the busy$ observable multiple times and call next on it when the busy status changes.

    source$: Observable<any>;
    busy$ = new Subject();
    busy: boolean;

    private itemSubscription: Subscription;
    private stayAliveSubscription: Subscription;

    items: any[] = [];

    constructor() { }

    ngOnInit() {
      this.source$ = Observable.range(1, 500).zip(
        Observable.interval(500),
        function (x, y) { return x; }
      );

      this.busy$
        .subscribe(busy => {
          this.busy = <any>busy;
        });
    }

    start() {
      if (!this.itemSubscription) {
        this.itemSubscription = this.source$.concatMap(item => {
          const busySubject = new Subject();
          this.busy$
            .subscribe(result => {
              busySubject.next(item);
              busySubject.complete();
            });

          if (this.busy) {
            return busySubject;
          } else {
            return Observable.of(item);
          }

        })
          .subscribe(item => {
            this.items.push(item);
          }, error => {
            this.items.push(error);
          });
      }

      this.setBusy(false);
    }

Solution

  • I don't fully understand what you are trying to do, but if it is just a matter of preserving the order of the emitted values while the "async task" can take a long (random) time, I guess you could use the concatMap operator.

    Theory

    concatMap

    Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next. concatMap operator marble diagram

    Practice

    In this example, the srcObservable emits a value every 100ms and each value is mapped to a new observable that emits a value between 0 and 2000ms (the async task). You can see the order is safe.

    let src = Rx.Observable.timer(0,100);
    src.concatMap(i=>{
      return Rx.Observable.timer(Math.random()*2000).mapTo(i); // this is the async task
    }).subscribe(data=>console.log(data));
    <script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>

    Making a hot Observable

    You should also not use these subscriptions to make your observable emit data. Actually you should transform your cold observable to a hot one using .publish() and .connect() instead of share()and subscribe() :

    this.source$ = Observable.range(1, 500).zip(
            Observable.interval(500),
            function (x, y) { return x; }
          ).publish();
    // blah blah blah some code
    this.source$.connect();