I want to learn more about writing declarative/functional code with RxJS. I'm trying to use splitting, merging, and other forms of stream coordination to encapsulate the logic, rather than imperatively handling things. I have an example scenario that hopefully you can help me with.
When a user clicks, I want to log statistics about their click & hover behavior. I also want to show a notification if the statistics are considered 'primary'.
const click$ = fromEvent(document, 'click');
// emits very frequently, as the user moves the mouse around the screen
const hover$ = new Subject<CustomHoverEvent>();
// The user can hover their mouse around the application, and when
// they click, emit the latest hover event. If there was a click without
// any hovers, dont emit anything.
const lastHoverEvent$ = hover$.pipe(
buffer(click$),
map(events => events[events.length - 1]),
filter(event => !!event)
);
const [primaryHover$, secondaryHover$] = partition(
lastHoverEvent$,
(event) => event.isPrimaryHover()
);
const primaryHoverStats$ = primaryHover$.pipe(
map(event => calculatePrimaryHoverStats(event)
);
const primaryHoverStatsNotification$ = primaryHoverStats$.pipe(
map(stats => createNotification(stats)
);
const secondaryHoverStats$ = secondaryHover$.pipe(
map(event => calculateSecondaryHoverStats(event)
);
const hoverStats$ = merge(primaryHoverStats$, secondaryHoverStats$);
hoverStats$.subscribe(stats => logStats(stats));
primaryHoverStatsNotification$.subscribe(notification => notify(notification));
Here is how the above streams currently work. Each color represents the code that is run due to each subscription. All of the upstream work is repeated for each subscriber (buffering the events, and calculating the stats for primary).
But here is how I actually want it to work. Where the work that is done along the pipeline is not repeated, and each subscriber only handles its part.
To fix this, I added few share()
operators, in order to multicast the work that has already been done.
const lastHoverEvent$ = hover$.pipe(
buffer(click$),
map(events => events[events.length - 1]),
filter(event => !!event),
share()
);
...
const primaryHoverStats$ = primaryHover$.pipe(
map(event => calculatePrimaryHoverStats(event),
share()
);
The type of flow I am trying to replicate looks like this with plain functions.
const lastHoverEvent = getLastHoverEvent();
let hoverStats;
if (lastHoverEvent.isPrimaryHover()) {
hoverStats = calculatePrimaryHoverStats(lastHoverEvent);
const notification = createNotification(hoverStats);
notify(notification);
} else {
hoverStats = calculateSecondaryHoverStats(lastHoverEvent);
}
logStats(hoverStats);
or like this with a map
in the original code
hover$.pipe(
buffer(click$),
map(events => events[events.length - 1]),
filter(event => !!event),
map(event => {
let hoverStats;
if (event.isPrimaryHover()) {
hoverStats = calculatePrimaryHoverStats(event);
const notification = createNotification(hoverStats);
notify(notification);
} else {
hoverStats = calculateSecondaryHoverStats(event);
}
return hoverStats;
})
).subscribe(stats => logStats(stats));
Is adding share
to your streams really necessary to allow RxJS to work this way? In the 'plain functions' example above, it seems very simple and nothing out of the ordinary, but when you try to do it by splitting up the streams into separate events, duplicate upstream work is done which seems a bit unintuitive to me.
I feel like I'm missing something. Is this not a good way to go about writing RxJS? I feels so right, when thinking about the logic in terms of the diagrams above, like interconnected streams... but the overhead of share
seems weird. Should I just stick to something like the map
solution in the last code example?
(unrelated question) I used buffer
to get the latest hover event on click. I was thinking about using ReplaySubject
and withLatestFrom()
, to reduce the in-memory storage of hover events (because I only need the latest one).
const click$ = fromEvent(document, 'click');
const hover$ = new ReplaySubject<CustomHoverEvent>(1);
const lastHoverEvent$ = click$.pipe(
withLatestFrom(hover$),
map(([click, hover]) => hover),
...
);
but this doesn't work because I need the click$
event to clear whatever hover events have already passed. For example, if a user clicks 2 times in a row without hovering on anything, I don't want a stale/old hover event to be retrieved.
Is there a better way to do this without buffer
? It seems wasteful to store all the events in memory just to get the last one. I was hoping to find a size configuration on buffer
, and set it to 1, but I don't see any.
Although it may seem a bit counterintuitive at first, share()
is definitely needed in order to not unnecessarily subscribe to sources.
Basically, every observable source can be reduced so something like this:
new Observable(subscriber => {
...
});
Note: in case of Subjects, things are slightly different.
For example, here are some relevant parts of fromEvent
:
return new Observable<T>((subscriber) => {
const handler = (...args: any[]) => subscriber.next(1 < args.length ? args : args[0]);
// ...
}
Despite defining data sources very declaratively(e.g. hover$
-> lastHoverEvent$
-> primaryHover$
-> ...), every subscription will result in subscribing to the source once again. In case of hover$
, each subscription to any of its derived sources(e.g. primaryHover$
) will result in a new subscriber being added to the hover$
Subject's subscribers list.
The way the share()
operator prevents subscribing multiple times to a data source is by employing a local Subject that will keep track of all subscribers. The original data source will be subscribed only once, by checking how many subscribers are active.
I think you can still go with the withLatestFrom
approach, but using a Subject
instead of a ReplaySubject
.
const click$ = fromEvent(document, 'click');
const hover$ = new Subject<CustomHoverEvent>();
const lastHoverEvent$ = click$.pipe(
withLatestFrom(hover$),
// Covering the case where `hover$` has been cleared.
filter(([, hover]) => !!hover),
map(([click, hover]) => hover),
tap({ next: () => {
// Clearing the possible stale hover events.
hover$.next(null);
}}),
);
Here are the scenarios I can think of based on the above snippet:
hover$
will next()
the latests hover event; when the user clicks, due to using withLatestFrom
, only the latest value will be usedhover$.next(null)
will be invoked and so the value that withLatestFrom
keeps with regards to hover$
will be null
; then, when the user clicks the second time, the filter
condition will not pass; however, if the user hovers again, the internal value of hover$
will be updated accordingly and so, if the user clicks, the filter
pass will check