Search code examples
python-3.xfunctional-programmingrxjsreactive-programmingrx-py

How to delay event emission with rxpy/rxjs?


I've got two event streams. One is from an inductance loop, the other is an IP camera. Cars will drive over the loop and then hit the camera. I want to combine them if the events are within N milliseconds of each other (car will always hit the loop first), but I also want the unmatched events from each stream (either hardware can fail) all merged into a single stream. Something like this:

           ---> (only unmatched a's, None)
         /                                  \
stream_a (loop)                              \
         \                                    \
            --> (a, b) ---------------------------> (Maybe a, Maybe b)
         /                                    /
stream_b  (camera)                           /
         \                                  /
            --> (None, only unmatched b's)

Now certainly I can hack my way around by doing the good ole Subject anti-pattern:

unmatched_a = Subject()

def noop():
    pass

pending_as = [[]]

def handle_unmatched(a):
    if a in pending_as[0]:
        pending_as[0].remove(a)
        print("unmatched a!")
        unmatched_a.on_next((a, None))

def handle_a(a):
    pending_as[0].append(a)
    t = threading.Timer(some_timeout, handle_unmatched)
    t.start()
    return a

def handle_b(b):
    if len(pending_as[0]):
        a = pending_as[0].pop(0)
        return (a, b)

    else:
        print("unmatched b!")
        return (None, b)

stream_a.map(handle_a).subscribe(noop)
stream_b.map(handle_b).merge(unmatched_a).subscribe(print)

Not only is this rather hacky, but although I've not observed it I'm pretty sure there's a race condition when I check the pending queue using threading.Timer. Given the plethora of rx operators, I'm pretty sure some combination of them will let you do this without using Subject, but I can't figure it out. How does one accomplish this?

Edit

Although for organizational and operational reasons I'd prefer to stick to Python, I'll take a JavaScript rxjs answer and either port it or even possibly rewrite the entire script in node.


Solution

  • You should be able to solve the problem using auditTime and buffer. Like this:

    function matchWithinTime(a$, b$, N) {
      const merged$ = Rx.Observable.merge(a$, b$);
      // Use auditTime to compose a closing notifier for the buffer.
      const audited$ = merged$.auditTime(N);
      // Buffer emissions within an audit and filter out empty buffers.
      return merged$
        .buffer(audited$)
        .filter(x => x.length > 0);
    }
    
    const a$ = new Rx.Subject();
    const b$ = new Rx.Subject();
    matchWithinTime(a$, b$, 50).subscribe(x => console.log(JSON.stringify(x)));
    
    setTimeout(() => a$.next("a"), 0);
    setTimeout(() => b$.next("b"), 0);
    setTimeout(() => a$.next("a"), 100);
    setTimeout(() => b$.next("b"), 125);
    setTimeout(() => a$.next("a"), 200);
    setTimeout(() => b$.next("b"), 275);
    setTimeout(() => a$.next("a"), 400);
    setTimeout(() => b$.next("b"), 425);
    setTimeout(() => a$.next("a"), 500);
    setTimeout(() => b$.next("b"), 575);
    setTimeout(() => b$.next("b"), 700);
    setTimeout(() => b$.next("a"), 800);
    .as-console-wrapper { max-height: 100% !important; top: 0; }
    <script src="https://unpkg.com/rxjs@5/bundles/Rx.min.js"></script>

    If it's possible for b values to be closely followed by a values and you do not want them to be matched, you could use a more specific audit, like this:

    const audited$ = merged$.audit(x => x === "a" ?
      // If an `a` was received, audit upcoming values for `N` milliseconds.
      Rx.Observable.timer(N) :
      // If a `b` was received, don't audit the upcoming values.
      Rx.Observable.of(0, Rx.Scheduler.asap)
    );