I have a requirement to listen to a stream of items from an observable. When certain conditions arise an asynchronous task will be performed on the item and the component will be 'busy' until this completes. I would like to pause handling items in the subscription until this task has completed (as the processing of the following items is dependent on the result) and then resume from the next item in the sequence without any loss.
the next part is probably best read whilst looking at the Plunk here
To achieve this I have used a buffer with a swtichMap. I thought these would do the job on their own but switchMap destroys and recreates the subscription the sequence gets reset every time.
export class AppComponent implements OnInit {
source$: Observable<any>;
clearBuffer$ = new Subject();
busy$ = new Subject();
private itemSubscription: Subscription;
private stayAliveSubscription: Subscription;
items: any[] = [];
constructor() { }
ngOnInit() {
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
).share();
this.busy$
.subscribe(result => {
if (!result) {
this.clearBuffer$.next();
}
}, error => {
console.log(error);
});
}
start() {
if (!this.itemSubscription) {
this.itemSubscription =
this.busy$.switchMap(busy => {
if (busy) {
return this.source$.buffer(this.clearBuffer$);
} else {
return this.source$;
}
})
.subscribe(items => {
if (Array.isArray(items)) {
this.items.push('buffered: ' + items.join());
} else {
this.items.push('live feed: ' + items);
}
}, error => {
this.items.push(error);
});
this.stayAliveSubscription = this.source$
.subscribe(result => {
console.log(result);
}, error => {
console.log(error);
});
this.busy$.next(false);
}
}
...
}
To fix this the source$ observable is now shared and a separate subscription is started (stayAliveSubscription) so a single subscription is used throughout. This seems messy to me and I wanted to ask if anyone can show me better/alternative approaches to the problem.
I put the working sample in a Plunk here click start to start the subscription and then set/unset the busy toggle to buffer and continue.
edit: working code with concatMap
I changed the Plunk to use concatMap. I've pasted the code below as well. The key is that the busy observable returned in concatMap must complete you can't just return the busy$ observable multiple times and call next on it when the busy status changes.
source$: Observable<any>;
busy$ = new Subject();
busy: boolean;
private itemSubscription: Subscription;
private stayAliveSubscription: Subscription;
items: any[] = [];
constructor() { }
ngOnInit() {
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
);
this.busy$
.subscribe(busy => {
this.busy = <any>busy;
});
}
start() {
if (!this.itemSubscription) {
this.itemSubscription = this.source$.concatMap(item => {
const busySubject = new Subject();
this.busy$
.subscribe(result => {
busySubject.next(item);
busySubject.complete();
});
if (this.busy) {
return busySubject;
} else {
return Observable.of(item);
}
})
.subscribe(item => {
this.items.push(item);
}, error => {
this.items.push(error);
});
}
this.setBusy(false);
}
I don't fully understand what you are trying to do, but if it is just a matter of preserving the order of the emitted values while the "async task" can take a long (random) time, I guess you could use the concatMap
operator.
concatMap
Projects each source value to an Observable which is merged in the output Observable, in a serialized fashion waiting for each one to complete before merging the next.
In this example, the src
Observable emits a value every 100ms and each value is mapped to a new observable that emits a value between 0 and 2000ms (the async task). You can see the order is safe.
let src = Rx.Observable.timer(0,100);
src.concatMap(i=>{
return Rx.Observable.timer(Math.random()*2000).mapTo(i); // this is the async task
}).subscribe(data=>console.log(data));
<script src="https://unpkg.com/[email protected]/bundles/Rx.min.js"></script>
You should also not use these subscriptions to make your observable emit data. Actually you should transform your cold observable to a hot one using .publish()
and .connect()
instead of share()
and subscribe()
:
this.source$ = Observable.range(1, 500).zip(
Observable.interval(500),
function (x, y) { return x; }
).publish();
// blah blah blah some code
this.source$.connect();