Search code examples
system.reactivesynchronize

Synchronize multiple subscriptions in RX


Is it possible to force multiple RX subscriptions to different observables run serially (not simultaneously)?

I am aware that I can use EventLoopScheduler for that, but that will degrade performance because all handling will be done on a single thread.


Solution

  • If you mean to run one observable until OnCompleted then start the next, you should look into Concat. If you mean to have multiple different observables that are subscribed to at the same time, you could use Merge (if the semantics make sense for your scenario). If Merge is not appropriate, I would recommend using one of the standard thread synchronization methods (lock, Monitor, etc) in the observer methods or the EventLoopScheduler you already know about.

    EDIT Original answer preserved below

    Yes, it is possible to force serial observer execution. However, whether you need to or not depends on the observable. In general, hot observables will already run serially, whereas cold observables will not. This is a side-effect of the difference in the way hot and cold observables work. To make a cold observable hot, and thus make observers run serially, use Publish. Here's an example demonstrating the various behaviors.

    Sub Main()
        'hot observable, runs serially
        Dim trigger As New ObsEvent
        Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
                        Sub(h) AddHandler trigger.Triggered, h,
                        Sub(h) RemoveHandler trigger.Triggered, h)
        Dim sub1 = eobs.Subscribe(Sub(v)
                                      Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item)
                                      Thread.Sleep(2000)
                                      Console.WriteLine("Ending event observer 1")
                                  End Sub)
        trigger.Trigger("event trigger 1")
        Dim sub2 = eobs.Subscribe(Sub(v)
                                      Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item)
                                      Thread.Sleep(2000)
                                      Console.WriteLine("Ending event observer 2")
                                  End Sub)
        trigger.Trigger("event trigger 2")
    
        Console.WriteLine()
        Console.WriteLine()
    
        'cold observable, runs "simultaneously"
        Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5))
        sub1 = tobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting timer observer 1")
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending timer observer 1")
                              End Sub,
                              Sub(ex) Console.WriteLine("Error"),
                              Sub() Console.WriteLine("Observer 1 completed"))
        Thread.Sleep(500)
        sub2 = tobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting timer observer 2")
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending timer observer 2")
                              End Sub,
                              Sub(ex) Console.WriteLine("Error"),
                              Sub() Console.WriteLine("Observer 2 completed"))
    
        'cold observable turned hot, runs serially
        Dim pobs = tobs.Publish()
        sub1 = pobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting publish observer 1")
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending publish observer 1")
                              End Sub,
                              Sub(ex) Console.WriteLine("Error"),
                              Sub() Console.WriteLine("Observer P1 completed"))
        Thread.Sleep(500)
        sub2 = pobs.Subscribe(Sub(v)
                                  Console.WriteLine("Starting publish observer 2")
                                  Thread.Sleep(2000)
                                  Console.WriteLine("Ending publish observer 2")
                              End Sub,
                              Sub(ex) Console.WriteLine("Error"),
                              Sub() Console.WriteLine("Observer P2 completed"))
        pobs.Connect()
    
        Console.ReadKey()
    End Sub