I am working with a TypeScript, Angular, NGRX application. I have been writing my state observables without using selectors - the main reason is that I have found that they are less powerful than using RxJS operators directly. As an example, it is not possible to restrict the emission of events using selectors alone - instead a filtering operator must be used.
For the most part, I have had no issues replacing selectors with observables - observables can compose in all of the same ways that selectors can - with one exception: I cannot figure out how to compose observables which may be triggered from the same action. Usually, I have used combineLatest as my goto observable composer; however, in the case when two observables would update on the same action, there is a transient update where one of the observables has a value from the new state and the other has a value from the previous state.
Originally, I considered using the zip observable creator instead; however, while this solves the problem when two observables update together, it does not solve the problem when one observable is updated without the other - as is entirely possible with an NGRX architecture.
I then considered the auditTime(0) operator, which does solve the problem of removing the transient update, but has new problems 1) It causes observables to emit on a later event loop which breaks some assumptions inside of the application (solvable, but annoying) 2) It causes various observables to emit as soon as they can, whereas I would like all observables to emit together, on the same store pulse. Graphically, this means that rendering of different parts of the application are staggered, instead of being drawn together on the same frame (note that our application is very data-heavy, and it is often necessary to drop frames on store pulses)
Finally, I wrote a custom operator to compose observables which are derived from the same source
export type ObservableTuple<TupleT extends any[]> = {
[K in keyof TupleT]: Observable<TupleT[K]>;
};
export function selectFrom<SourceT, TupleT extends any[]>(...inputs: ObservableTuple<TupleT>): OperatorFunction<SourceT, TupleT> {
return (source$: Observable<SourceT>) => source$.pipe(
withLatestFrom(combineLatest<TupleT>(inputs)),
map(([, values]) => values),
);
}
Here is a summary of the problem in TypeScript (using snippets of NGRX, RxJS, and Angular)
interface IState {
foo: string;
bar: string;
}
@Injectable()
class SomeService {
constructor(store$: Store<IState>) {
}
readonly foo$ = this.store$.pipe(select(state => state.foo));
readonly bar$ = this.store$.pipe(select(state => state.bar));
readonly composed$ = this.store$.pipe(
selectFrom(
this.foo$,
this.bar$,
),
map(([foo, bar]) => `${foo} - ${bar}`),
);
}
const UPDATE_FOO = {
type: 'update foo',
foo: 'some updated value for foo'
};
const UPDATE_BAR = {
type: 'update bar',
bar: 'some updated value for bar',
};
const UPDATE_BOTH = {
type: 'update both',
both: 'some updated value for both foo and bar',
};
This works perfectly correctly even when selectFrom calls are nested within one another e.g.
readonly composed2$ = this.store$.pipe(
selectFrom(
this.composed$,
this.foo$
)
)
So long as composed$ is defined before composed2$, everything works out; however, a case I did not consider is when using an operator like switchMap in between composed$ and composed2$. In this case, because compsed2$ is destroyed and recreated by switchMap, it is possible for composed2$ to fire before composed$, which causes everything to get out of sync
For your specific problem of trying to compose 2 observables and only emit after both of them have finished emitting, you can try to take advantage of:
Then you could do something like the following:
readonly queuedStore$ = this.store$.pipe(
observeOn(queue), // use queue scheduler to listen to store updates
share() // use the same store event to update all of our selectors
);
// use queuedStore$ here
readonly foo$ = this.queuedStore$.pipe(select(state => state.foo));
readonly bar$ = this.queuedStore$.pipe(select(state => state.bar));
// when composing, debounce the combineLatest() with an observable
// that completes immediately, but completes on the queue scheduler
readonly composed$ = combineLatest(foo$, bar$).pipe(
debounce(() => empty().pipe(observeOn(queue))));
What will happen?
queue
foo$
notifiescombineLatest
notifiesdebounce
subscribes to durationSelectorqueue
Works same as Foo update
queue
foo$
notifiescombineLatest
notifiesdebounce
subscribes to durationSelectorqueue
bar$
notifiescombineLatest
notifiesdebounce
throws away previous value from foo notificationdebounce
resubscribes to durationSelector
queue
In theory this gets you your desired behavior:
- Single updates apply immediately (before next tick)
- Combined update applies immediately (before next tick)
- Combined updates ignore intermediate result
- Should still work if your composed observable uses switch
.
If you dispatch another event while handling one of these notifications on the queue
scheduler, the notifications for that 2nd event will be deferred until after the current handler completes.