Search code examples
rxjsreactive-programmingsystem.reactive

Overlapping Buffer with Closing Selector


I have a stream of mouse events that emit whilst the user drags the mouse cursor on the screen whilst the button is clicked. I want to perform a pairwise operation on these events and can do this using a Buffer, e.g.,

var subscription = mouseDragging
    .Buffer(2, 1)
    .Subscribe(buffered => // Do stuff );

I also have access to another observable that emits when the dragging operation stops (user is no longer clicking the button). When this happens I want the buffer window to close even if there is only one item in the buffer (I basically need to flush the buffer in preparation for the next stream of drag events and not leave 'residual' items in there). How can I do this?

I see possible options as:

  1. Use TakeUntil to complete the buffered stream when dragging completes, something like

    var subscription = mouseDragging
     .Buffer(2, 1)
     .TakeUntil(dragComplete)
     .Subscribe(buffered => // Do stuff );
    

but to use the above I would need to re-subscribe on completion - not sure how I do this?

  1. Somehow define a closing selector observable in conjunction with the buffer count, something like this (obviously not correct)

    var subscription = mouseDragging
     .Buffer(2, 1, dragComplete)
     .Subscribe(buffered => // Do stuff );
    
  2. Some other way I haven't even thought of?


Solution

  • You're looking for the .Switch() operator. This operator takes an IObservable<IObservable<T>> and turns it into an IObservable<T> by subscribing to the inner observable whenever the outer one produces a values, but it also unsubscribes to the previous inner at the same time.

    This is how you get rid of the "residual" events you're talking about.

    So, I assumed you are starting with this:

    IObservable<MouseEventArgs> mouseDragging = ...;
    IObservable<DragEventArgs> dragComplete = ...;
    

    Then you need this intermediate query:

    IObservable<IObservable<IList<MouseEventArgs>>> source =
        dragComplete
            .StartWith((DragEventArgs)null)
            .Select(_ => mouseDragging.Buffer(2, 1));
    

    Now it's simple to subscribe to:

    IDisposable subscription =
        source
            .Switch()
            .Subscribe(buffered => { /* Do stuff */ });