Search code examples
.netf#reactive-programmingsystem.reactive

How to wait for an observable to complete?


In Node.js, you can set up a server and the process won't terminate as long as the server is alive and active in the event loop. I am wondering if it would be possible to do something like that using reactive extensions? I know how to set up a named pipe and make use of it to communicate with the Node server in another process, but I am not sure how to keep the program from terminating via anything other than waiting for a keypress. I'd like to implement that server as a part of a reactive pipeline. Would it be possible to block the main thread until the pipeline completes?


Solution

  • open System
    open System.Threading
    open FSharp.Control.Reactive
    
    module Observable = 
        /// Blocks the thread until the observable completes. 
        let waitUnit on_next (o : IObservable<_>) =
            use t = new ManualResetEventSlim()
            use __ = Observable.subscribeWithCompletion on_next (fun _ -> t.Set()) o
            t.Wait()
    
    Observable.interval (TimeSpan.FromSeconds(1.0))
    |> Observable.take 3
    |> Observable.waitUnit (printfn "%i...")
    
    printfn "Done."
    

    The above is using the F# Rx bindings, but the C# version would be similar. The library itself has Observable.wait which is of type IObservable<'a> -> 'a, but that one has the disadvantage of throwing an exception on empty sequences and keeping the latest value in memory until the observable is disposed. This one has the right semantics if the final value is not important.

    I've looked under the hood and wait uses ManualResetEventSlim to block the thread it is on, so this should be fine.