Search code examples
mergeobservablepriority-queuerxjs5reactive

RxJS interleaving merged observables (priority queue?)


UPDATE

I think I've figured out the solution. I explain it in this video. Basically, use timeoutWith, and some tricks with zip (within zip).

https://youtu.be/0A7C1oJSJDk


If I have a single observable like this:

A-1-2--B-3-4-5-C--D--6-7-E

I want to put the "numbers" as lower priority; it should wait until the "letters" is filled up (a group of 2 for example) OR a timeout is reached, and then it can emit. Maybe the following illustration (of the desired result) can help:

A------B-1-----C--D-2----E-3-4-5-6-7

I've been experimenting with some ideas... one of them: first step is to split that stream (groupBy), one containing letters, and the other containing numbers..., then "something in the middle" happen..., and finally those two (sub)streams get merged.

It's that "something in the middle" what I'm trying to figure out.

How to achieve it? Is that even possible with RxJS (ver 5.5.6)? If not, what's the closest one? I mean, what I want to avoid is having the "numbers" flooding the stream, and not giving enough chance for the "letters" to be processed in timely manner.

Probably this video I made of my efforts so far can clarify as well:

The problem with my solution so far (delaying each emission in "numbers" substream using .delay) is suboptimal, because it keeps clocking at slow pace (10 seconds) even after the "characters" (sub)stream has ended (not completed -- no clear boundary here -- just not getting more value for indeterminate amount of time). What I really need is, to have the "numbers" substream raise its pace (to 2 seconds) once that happen.


Solution

  • Unfortunately I don't know RxJs5 that much and use xstream myself (authored by one of the contributor to RxJS5) which is a little bit simpler in terms of the number of operators.

    With this I crafted the following example: (Note: the operators are pretty much the same as in Rx5, the main difference is with flatten wich is more or less like switch but seems to handle synchronous streams differently).

    const xs = require("xstream").default;
    
    const input$ = xs.of("A",1,2,"B",3,4,5,"C","D",6,7,"E");
    
    const initialState = { $: xs.never(), count: 0, buffer: [] };
    const state$ = input$
        .fold((state, value) => {
            const t = typeof value;
            if (t === "string") {
                return {
                    ...state,
                    $: xs.of(value),
                    count: state.count + 1
                };
            }
            if (state.count >= 2) {
                const l = state.buffer.length;
                return {
                    ...state,
                    $: l > 0 ? xs.of(state.buffer[0]) : xs.of(value) ,
                    count: 0,
                    buffer: state.buffer.slice(1).concat(value)
                };
            }
            return {
                ...state,
                $: xs.never(),
                buffer: state.buffer.concat(value),
            };
        }, initialState);
    
    
    xs
        .merge(
            state$
            .map(s => s.$),
            state$
            .last()
            .map(s => xs.of.apply(xs, s.buffer))
        )
        .flatten()
        .subscribe({
            next: console.log
        });
    

    Which gives me the result you are looking for.

    It works by folding the stream on itself, looking at the type of values and emitting a new stream depending on it. When you need to wait because not enough letters were dispatched I emit an emptystream (emits no value, no errors, no complete) as a "placeholder".

    You could instead of emitting this empty stream emit something like

    xs.empty().endsWith(xs.periodic(timeout)).last().mapTo(value):
    // stream that will emit a value only after a specified timeout.
    // Because the streams are **not** flattened concurrently you can
    // use this as a "pending" stream that may or may not be eventually
    // consumed
    

    where value is the last received number in order to implement timeout related conditions however you would then need to introduce some kind of reflexivity with either a Subject in Rx or xs.imitate with xstream because you would need to notify your state that your "pending" stream has been consumed wich makes the communication bi-directionnal whereas streams / observables are unidirectionnal.