So I have an Observable Data store with a private BehaviorSubject emitting items every time the store changes.
I also have a public method that returns an observable created from the BehaviorSubject that filters items and skip items based on the state.
I want to know how can I return an observable that completes itself after a certain condition is met without giving the responsibility to the consumer?
EDIT: The solution in this case is to have the consumers to use the .take(1) operator so that it completes after it emits the first time.
Here's some code relating to my question:
class Item {
public id: string;
public state: number;
};
@Injectable()
export class SomeDataService(){
private let items = []; //The store of items
private let stream$ = new BehaviorSubject<Item[]>; //The endless stream of items.
constructor( someService: SomeOtherService ){
this.someService.items$.subscribe( items => {
this.items = items;
this.stream$.next( items );
});
};
//
public getObservable( filterID: string ): Observable<Item> {
return this.$stream.asObservable().map( items => {
//Find the item in the list and return it
return items.find( item => {
return item.id === filterID;
});
}).flatMap( item => {
if( item && item.state === 3 ) { //arbitrary number
throw Observable.throw( item );
}
//Transform the item and such...
return Observable.of( item );
}).skipWhile( item => {
return item && item.state !== 1;
});
};
};
//Some other file to consume the service
this.someDataService.getObservable( 'uniqueID' ).finally( () => {
console.log("Now this gets printed when it emits the first time and on errors.");
}).subscribe();
/*
//Previous attempt
this.someDataService.getObservable( 'uniqueID' ).finally( () => {
console.log("I will never print this as the observable never completes.");
}).subscribe();
*/
So the solution here is to have the consumers use the .take(1) operator in order to complete after the first item has been emitted.
this.someDataService.getObservable( 'uniqueID' ).finally( () => {
console.log("Now this gets printed when it emits the first time and on errors.");
}).subscribe();
/*
//Previous attempt
this.someDataService.getObservable( 'uniqueID' ).finally( () => {
console.log("I will never print this as the observable never completes.");
}).subscribe();
*/