My desired behaviour:
So basically I would like to kick off two async processes, one of which is supposed to provide a value quickly but if it doesn't - I want to only use the value from a slower observable which takes precedence anyway.
To expand from my comments: the question is to trigger two observables in parallel and utilize the first emission even if the other observable hasn't emitted yet.
Normally you could use the merge
function for it.
However you have a condition ("If HTTP responds faster than cache - ignore cache.") that is not natively fulfilled by the merge
function nor by any standard RxJS operators.
But it is easy to write custom operators in RxJS from existing operators. For your case you could customize the filter
operator to suit your needs. See here for a brief intro on how to write a custom operator.
export const filterLateCache = () => {
let serverEmitted = false;
return <T>(source: Observable<T>) => {
return source.pipe(
filter((data: any) => {
if (!!data.server) {
serverEmitted = true;
return true;
} else if (!!data.cache) {
if (serverEmitted) {
return false;
} else {
return true;
}
} else {
return false;
}
})
);
};
};
As you can see the boolean flags server
and cache
in the incoming notification are checked to decide whether the value must be emitted. So you'd need to append the values from the observables with these flags using the map
operator.
merge(
server$.pipe(
map((value) => ({
server: true,
value: value,
}))
),
cache$.pipe(
map((value) => ({
cache: true,
value: value,
}))
)
)
.pipe(filterLateCache())
.subscribe({
next: ({ value }) => { // <-- utilize destructuring to ignore boolean flags
// handle response here
},
error: (error: any) => {
// handle errors
}
});
Working example: Stackblitz