I am trying to solve the following use case, but the more I read the more confused I get. Perhaps someone could show me the right direction.
Let's say I have some sort of javascript service that is called by some client and triggers a calculation via REST endpoint on a backend server. The necessary HTTP request is implemented using RXJS observables. Assume that the calculation is done based on seperate domain entites - I will come back to this in a moment.
The javascript service may be called multiple time within a short period of time. Sometimes the interval between the service invocations may be shorter than the time for the relating HTTP response. As service invocation may occur for the same domain entity (but with modified data), flying HTTP requests for a domain entity that another calculation has been triggered recently should be cancelled.
I know that I can use switchMap to "switch to a new inner observable" while the pending inner observable is cancelled.
import { of, Subject } from "rxjs";
import { delay, switchMap, tap } from "rxjs/operators";
const outer = new Subject();
const inner = (x) => of("Calculation Result " + JSON.stringify(x)).pipe(
tap(() => console.log('Begin Calculation', x)),
delay(1000)
);
function startCalc(x) {
outer.next(x);
}
outer.pipe(switchMap((x) => inner(x))).subscribe(x => console.log(x));
startCalc({id: 'A', data: 1}); // Entity A
startCalc({id: 'B', data: 1});
startCalc({id: 'A', data: 3}); // Entity A again
startCalc({id: 'C', data: 1});
startCalc({id: 'D', data: 1});
This snippet prints Calculation Result {"id": "D", "data": 1}
to the console. I understand that all the invocations (for entities A, B, A, and C) are cancelled as the last invocation is triggered before the calculation response is emitted by the inner observable.
How can I achieve that the outer observable emits
Calculation Result {"id": "B", "data": 1}
Calculation Result {"id": "A", "data": 3}
Calculation Result {"id": "C", "data": 1}
Calculation Result {"id": "D", "data": 1}
with the first invocation for entity A being cancelled as a second invocation for the same entity has been taken place before the relating calculation result for the first invocation has been returned.
Any help is really appreciated.
You could use the operator groupBy
to split the outer observable into multiple observables based on the id property. And then apply the switchMap to those splitted Observables.
It'd be something like this.
outer.pipe(
groupBy((item) => item.id), // splits main source into multiple entityObs
mergeMap( //executes all those splitted observables in parallel
(entityObs) => entityObs.pipe(switchMap((x) => inner(x))) //switch on each entityObs.
)
)
.subscribe((x) => console.log(x));
Cheers