Search code examples
.netexceptionf#system.reactivecombinators

Self-error-dependent self-disposal of IObservable subscriptions


I have F# code that looks like this:

module O = Control.Observable
//...
use ss = serve' 4000
         |> O.subscribe
            (fun c -> use cs = RSS.items
                               |> O.subscribe (bytes >> c.SendAll) |> ignore)

where

serve'    : int -> IObservable<Socket>
c         : Socket
RSS.items : IObservable<XElement>
bytes     : XElement -> byte []
c.SendAll : byte [] -> unit
  • What is the most idiomatic way to retain cs until c.SendAll fails?
  • Is there or is it possible to define Observable.subscribeUntilError(action) where if action fails, subscription gets disposed; otherwise action is run as long as IObservable keeps pushing?

Solution

  • I have come up with this:

    let inline Δ<'a> = Unchecked.defaultof<'a>
    let inline LOG x = printf "%A" x
    
    module O = Observable
      let d = ref (Δ:IDisposable)
      let b x = try a x with e -> LOG e; let s = !d in if s <> Δ then s.Dispose()
      d := o |> O.subscribe b
      {
        new IDisposable with
          member x.Dispose() = let s = !d in if s <> Δ then d := Δ; s.Dispose()
      }
    

    To demonstrate the difference, try in main!

    Using subscribe:

    use s = new Subject<int>()
    use x = s |> O.subscribe (fun _ -> raise <| Exception ())
    use y = s |> O.subscribe (printf "%i")
    s.OnNext 20
    

    application comes crashing down:

    Unhandled Exception: System.Exception: Exception of type 'System.Exception' was thrown.
       at Microsoft.FSharp.Core.Operators.Raise[T](Exception exn)
       at [email protected](Int32 _arg1) in C:\Eniox\Eniox.News.Google\Eniox.News.Google\Program.fs:line 60
       at Microsoft.FSharp.Control.CommonExtensions.SubscribeToObservable@1915.System-IObserver`1-OnNext(T value)
       at System.Reactive.Observer`1.OnNext(T value)
       at System.Reactive.Subjects.Subject`1.OnNext(T value)
       at Program.System.main(String[] args) in C:\Eniox\Eniox.News.Google\Eniox.News.Google\Program.fs:line 606
    

    Now using subscribeUE:

    use s = new Subject<int>()
    use x = s |> O.subscribeUE (fun _ -> raise <| Exception ())
    use y = s |> O.subscribe   (printf "%i")
    s.OnNext 20
    

    delightfully disposes subscription x, application continues to run without a hiccup and exits normally! Output with LOG = ignore:

    20
    

    I would love to know whether comparable functionality actually exists somewhere in RX 2.0 as I find this combinator to be too useful to leave out.