I have a NGXS action that triggers a side-effect. The side-effect gets a list of todos
as an Observable stream (e.g. firestore collection).
Depending on the success or failure, the side-effect will dispatch the corresponding action.
// todos.state.ts
export class TodoState implements OnDestroy{
private ngUnsubscribe = new Subject();
private readonly todos$: Observable<any>;
constructor(private todosApiService: TodosApiService) {
this.todos$ = this.todosApiService.getTodos() // Returns Observable
}
@Action(Todos.Get, {cancelUncompleted: true})
getTodos({patchState, dispatch}: StateContext<TodosStateModel>, action: Todos.Get) {
patchState({todos: action}) // Set loading flag to true
return this.todosApiService.getTodos().pipe(
map((todos) => dispatch(new Todos.GetUpdate(todos, action))), // Set data
catchError((error) => dispatch(new Todos.GetError(error, action))), // Set error
takeUntil(this.ngUnsubscribe)
)
}
@Action([Todos.GetUpdate, Todos.GetError, Todos.GetComplete], {cancelUncompleted: true})
getTodosChange(
{patchState, getState}: StateContext<TodosStateModel>,
action: Todos.GetUpdate | Todos.GetError | Todos.GetComplete) {
patchState({todos: action})
}
... // unsubscribe on destroy
}
Now, in my component I subscribe to the state changes like so:
// todos.component.ts
...
this.store.dispatch(new Todos.Get())
this.store.select(state => state.todos.todos)
.pipe(
tap(todos => console.log("todos", todos)), // Prints changes correctly
takeUntil(this.ngUnsubscribe), // Unsubscribes from state changes correctly
).subscribe()
// Unsubscribe from state changes after 10 seconds (simulation)
setTimeout(() => {
this.ngUnsubscribe.next();
this.ngUnsubscribe.complete();
}, 10000);
...
In this example the component will receive all changes emitted to the observable stream within 10 seconds correctly. After the 10 seconds passed, the this.store.select
-pipe will be "closed" and the component won't receive any state changes anymore.
This is expected behaviour and works fine so far.
Now the issue that's bothering me is the following:
The original observable stream from the dispatched action is still "open". Any changes emitted in the stream will still be reflected in the state, since the Update
side-effect will be triggered. Whether the component that originally dispatched the action subscribes to the state changes or not.
Any other subscription on the state will pick up on the changes.
I'd like to be able to close a stream (and "end" an action) from the component. Unfortunately, this doesn't work:
// todos.component.ts
this.store.dispatch(new Todos.Get())
.pipe(
takeUntil(this.ngUnsubscribe),
)
I'm not sure how to implement NGXS actions with side-effects using observables correctly. I read on multiple occasions that NGXS (at least in parts) unsubscribes from actions automatically. I'm not sure if this also happens with rxjs streams that have been returned from an action like in my case. I'm also not sure if this is the expected behaviour when working with NGXS observables. I'm also not sure what might happen if multiple components dispatch the Get
-Action. Are there multiple streams now or is it just one stream?
Ideally, every dispatch action has its own stream, that I will be able to close from the component. This way, I can decide whether a component should listen to changes or not and not accidentally close a "global" stream for all components or risk "data-leaks" by not closing streams properly.
I solved this issue. I got inspired by this answer on a similar question posted about ngrx/rxjs. This is how:
I added another action:
// todos.actions.ts
export class GetCancel {
static readonly type = '[Todos] Get Todos Cancel';
constructor() {
}
}
Then I modified the side-effect:
// todos.state.ts
export class TodoState implements OnDestroy{
private readonly todos$: Observable<any>;
constructor(private todosApiService: TodosApiService,
private actions: Actions) {
this.todos$ = this.todosApiService.getTodos() // Returns Observable
}
@Action(Todos.Get, {cancelUncompleted: true})
getTodos({patchState, dispatch}: StateContext<TodosStateModel>, action: Todos.Get) {
patchState({todos: action}) // Set loading flag to true
return this.todosApiService.getTodos().pipe(
switchMap((todos) => dispatch(new Todos.GetUpdate(todos, action))), // Set data
catchError((error) => dispatch(new Todos.GetError(error, action))), // Set error
takeUntil(this.actions.pipe(ofActionDispatched(Todos.GetCancel))) // <-- HERE cancel action
)
}
...
}
Now I am able to use the entire action in my components like this:
// todos.component.ts
...
ngOnInit() {
this.store.dispatch(new Todos.Get())
this.store.select(state => state.todos.todos)
.subscribe(todos => console.log("todos", todos)); // Prints changes correctly
// (as long as the Cancel-Action hasn't been dispatched)
}
ngOnDestroy(): void {
// Dispatching the Cancel-Action means that all
// Get-Streams will be closed. The Update-Action won't be
// triggered anymore. If you have another component that is
// also listening to state changes or want to start to listen to state
// changes again, you have to dispatch another Get action.
this.store.dispatch(new Todos.GetCancel())
}
...
This way I have full control over the observable streams in my side-effects within my components. Meaning I can decide how long to listen to e.g. firestore or websocket changes (or any observable for that matter), thus preventing "data/memory" leaks by accidentally not closing them.