I'm new to RxJS. I want to create an observable of an AppState
object that may change at any time, and subscribe to it to get those changes. This is a stripped down implementation:
export class AppState {
public get observable(): Observable<any> {
return Observable.of(this._state);
}
}
// appState is injected into my component via angular DI
this.appState.observable
.subscribe((appState) => {
console.log('appState: ', appState);)
}, (err) => {
console.log('Error: ' + err);
}, () =>{
console.log('Completed');
});
But it only runs once and immediately calls completed
. So when I change my appState, the subscription already ended.
How do I keep the subscription alive forever, like KnockOutJS style. This is used in an Angular application
UPDATE: I got it partially working with Subject
. But the problem is now it is emitting many duplicates of the same value.
// full appState.ts
import { Injectable } from '@angular/core';
import { Observable, Subject, BehaviorSubject } from 'rxjs';
export type InternalStateType = {
[key: string]: any
};
@Injectable()
export class AppState {
public _state: InternalStateType = {};
public subject: Subject<any>;
constructor() {
this.subject = new Subject();
}
/**
* Return an observable for subscribing to.
*/
public get observable() {
return this.subject;
}
/**
* Return a clone of the current state.
*/
public get state() {
this._state = this._clone(this._state);
this.subject.next(this._state);
return this._state;
}
/**
* Never allow mutation
*/
public set state(value) {
throw new Error('do not mutate the `.state` directly');
}
public get(prop?: any) {
/**
* Use our state getter for the clone.
*/
const state = this.state;
return state.hasOwnProperty(prop) ? state[prop] : state;
}
public set(prop: string, value: any) {
/**
* Internally mutate our state.
*/
return this._state[prop] = value;
}
private _clone(object: InternalStateType) {
/**
* Simple object clone.
*/
return JSON.parse(JSON.stringify(object));
}
}
What needs to be changed to get it to only emit the change once for each change to this._state
?
You need to use a Subject, or a BehaviorSubject.
A Subject will emit values to subscribers as they are passed to the Subject, while a BehaviorSubject will emit the last value given upon subscription and then continue to emit values as they become available.