Search code examples
scalaobservablereactive-programmingscala-3monix

Create multicast Monix Observable that can restart when new subscriber subscribes


I'm trying to configure the behavior of a Monix Observable to meet a particular set of requirements.

Basically I'm trying to get it so:

  • The observable is multicast (like a hot observable)
  • The observable stops running when all subscribers unsubscribe (Like .refCount)
  • BUT, if a new subscriber subscribes, then the Observable restarts again (Like a cold observable)

Here's my code:

import monix.reactive.Observable
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global

object Main {
  def main(args: Array[String]): Unit = {
    val observable = Observable.interval(1.second).map{i => println("Obs running"); i}.share

    val subscription1 = observable.foreach(n => println(s"sub1: $n"))

    Thread.sleep(2000)

    val subscription2 = observable.foreach(n => println(s"sub2: $n"))

    Thread.sleep(5000)

    subscription1.cancel()
    println("Cancel 1")

    Thread.sleep(3000)

    subscription2.cancel()
    println("Cancel 2")
    // All subscribers unsubscribed, observable stops emitting values and is disposed of

    println("Sleep")
    Thread.sleep(5000)
    // After some time, a new subscriber comes along
    val subscription3 = observable.foreach(n => println(s"sub3: $n"))
    println("Subscribed to 3")
    Thread.sleep(7000)
  }
}

SBT:

scalaVersion := "3.4.0"
libraryDependencies += "io.monix" %% "monix-reactive" % "3.4.1"

Here's the output of my script:

Obs running
sub1: 0
Obs running
sub1: 1
Obs running
sub1: 2
Obs running
sub1: 3
sub2: 3
Obs running
sub1: 4
sub2: 4
Obs running
sub1: 5
sub2: 5
Obs running
sub1: 6
sub2: 6
Obs running
sub1: 7
sub2: 7
Cancel 1
Obs running
sub2: 8
Obs running
sub2: 9
Cancel 2
Sleep
Subscribed to 3

By using .share I was able to get the observable to be multicast and to stop firing once sub1 and sub2 unsubscribe (because .share uses .refCount under the hood), but when sub3 subscribes it never receives any items.

Also note this is a simplified example. In my real use case the Observable isn't just a list of ints, but it's running a query on an external system and returning the results back, but the issue is essentially the same.

Would greatly appreciate any help on this. Thanks!


Solution

  • After an intense jam session with Claude Opus, we were able to come up with a solution:

    import monix.execution.atomic.{Atomic, AtomicInt}
    import monix.execution.{Ack, Cancelable, Scheduler}
    import monix.reactive.Observable
    import monix.reactive.observers.Subscriber
    import monix.reactive.subjects.PublishSubject
    
    import scala.concurrent.Future
    
    class RestartableRefCountObservable[A] private (source: Observable[A]) extends Observable[A] {
      private var sourceConnection: Cancelable = null
      private val subject: PublishSubject[A] = PublishSubject()
      private val connectionCount = Atomic(0)
    
      override def unsafeSubscribeFn(subscriber: Subscriber[A]): Cancelable = {
    
        def startConnection(): Unit = {
          sourceConnection = source.subscribe(new Subscriber[A] {
            override def onNext(elem: A): Future[Ack] = subject.onNext(elem)
    
            override def onError(ex: Throwable): Unit = {
              sourceConnection.cancel()
              subject.onError(ex)
            }
    
            override def onComplete(): Unit = {
              sourceConnection.cancel()
              subject.onComplete()
            }
    
            override implicit def scheduler: Scheduler = subscriber.scheduler
          })
        }
    
        if (connectionCount.incrementAndGet() == 1) startConnection()
    
        val subjectSubscription = subject.unsafeSubscribeFn(subscriber)
    
        Cancelable { () =>
          subjectSubscription.cancel()
          if (connectionCount.decrementAndGet() == 0) sourceConnection.cancel()
        }
      }
    }
    
    object RestartableRefCountObservable {
      def apply[A](source: Observable[A]): RestartableRefCountObservable[A] =
        new RestartableRefCountObservable(source)
    }
    

    The solution was to implement a new class that extends Observable and that internally keeps track of the number of subscribers it has. Upon the first connection it will connect to the source Observable and after all subscribers have disconnected it will close the connection to the source. The crucial difference between monix.reactive.observables.RefCountObservable and this is that this will restart if another subscriber subscribes after all previous subscribers have unsubscribed.

    Here's an updated version of the script I was using to test before:

    import monix.execution.Scheduler.Implicits.global
    import monix.reactive.Observable
    
    import scala.concurrent.duration.*
    
    object Main {
      def main(args: Array[String]): Unit = {
        val source = Observable.interval(1.second).map(i => {
          println(s"Obs running: $i")
          i
        })
    
        val restartable = RestartableRefCountObservable(source)
    
        println("Sub1")
        val sub1 = restartable.foreach(i => println(s"Sub1: $i"))
        Thread.sleep(3000)
        println("Sub2")
        val sub2 = restartable.foreach(i => println(s"Sub2: $i"))
    
        Thread.sleep(5000)
    
        sub1.cancel()
        println("Sub1 canceled")
    
        Thread.sleep(3000)
        sub2.cancel()
        println("Sub2 canceled")
        Thread.sleep(10000)
    
        println("Sub3")
        val sub3 = restartable.foreach(i => println(s"Sub3: $i"))
    
        Thread.sleep(15000)
      }
    }
    

    and here's the new output:

    Sub1
    Obs running: 0
    Sub1: 0
    Obs running: 1
    Sub1: 1
    Obs running: 2
    Sub1: 2
    Obs running: 3
    Sub1: 3
    Sub2
    Sub2: 3
    Obs running: 4
    Sub1: 4
    Sub2: 4
    Obs running: 5
    Sub1: 5
    Sub2: 5
    Obs running: 6
    Sub1: 6
    Sub2: 6
    Obs running: 7
    Sub1: 7
    Sub2: 7
    Obs running: 8
    Sub1: 8
    Sub2: 8
    Sub1 canceled
    Obs running: 9
    Sub2: 9
    Obs running: 10
    Sub2: 10
    Obs running: 11
    Sub2: 11
    Sub2 canceled
    Sub3
    Obs running: 0
    Sub3: 0
    Obs running: 1
    Sub3: 1
    Obs running: 2
    Sub3: 2
    Obs running: 3
    Sub3: 3
    Obs running: 4
    Sub3: 4
    Obs running: 5
    Sub3: 5
    Obs running: 6
    Sub3: 6
    Obs running: 7
    Sub3: 7
    Obs running: 8
    Sub3: 8
    Obs running: 9
    Sub3: 9
    Obs running: 10
    Sub3: 10
    Obs running: 11
    Sub3: 11
    Obs running: 12
    Sub3: 12
    Obs running: 13
    Sub3: 13
    Obs running: 14
    Sub3: 14