Search code examples
scalareactive-programmingrx-scala

Is there an Operation to block onComplete?


I am trying to learn reactive programming, so forgive me if I ask a silly question. I'm also open to advice on changing my design.

I am working in scala-swing to display the results of a simulator. With one setting, a chart is displayed as a histogram; with the other setting the chart is displayed as the cumulative sum. (I'm probably using the wrong word; in the first setting you might have bin1=2, bin2=5, bin3=3; in the second setting the first height is 2, the second is 2 + 5, the third is 2 + 5 + 3, etc.). The simulator can be slow, so I originally used a Future to compute it, and the set the data into the chart. I decided to try a reactive approach, so my requirements are: 1. I don't want to recreate the data when I change the display mode, and 2. I want to set the Observable once for the chart and have the chart listen to the same Observable permanently.

I got this to work when I started the chain with a PublishSubject and the Future set the data into the start of the chain. When the display mode changed, I created a new PublishSubject().map(newRenderingLogic).subscribe(theChartsObservable). I am now trying to do what looks like the "right way," but it's not working correctly. I've tried to simplify what I have done:

val textObservable: Subject[String] = PublishSubject()
textObservable.subscribe(text => {
  println(s"Text: ${text}")
})
var textSubscription: Option[Subscription] = None
val start = Observable.from(Future {
  "Base text"
}).cache

var i = 0

val button = new Button() {
  text = "Click"
    reactions += {
      case event => {
        i += 1
        if (textSubscription.isDefined) {
          textSubscription.get.unsubscribe()
        }
        textSubscription = Some(start.map(((j: Int) => { (base: String) => s"${base} ${j}" })(i)).subscribe(textObservable))
    }
  }
}

On start, an Observable is created and logic to print some text is added to it. Then, an Observable with the generated data is created and a cache is added so that the result is replayed if the next subscription comes in after its results are generated. Then, a button is created. Then on button clicks a middle observable is chained with unique logic (it's a function that creates a function to append the value of i into the string, run with the current value of i; I tried to make something that couldn't just be reused) that is supposed to change with each click. Then the first Observable is subscribed to it so that the results of the whole chain end up being printed.

In theory, the cache operation takes care of not regenerating the data, and this works once, but onComplete is called on textObservable and then it can't be used again. It works if I subscribe it like this:

textSubscription = Some(start.map(((j: Int) => { (base: String) => s"${base} ${j}" })(i)).subscribe(text => textObservable.onNext(text)))

because the call to onComplete is intercepted, but this looks wrong and I wanted to know if there was a more typical way to do this, or architect it. It makes me think that I don't understand how this is supposed to be done if there isn't an out-of-the-box operation to do this.

Thank you.


Solution

  • I'm not 100% sure if I got the essence of your question right, but: if you have an Observable that may complete and you want to turn it into an Observable that never completes, you can just concatenate it with Observable.never.

    For example:

    // will complete after emitting those three elements:
    val completes = Observable.from(List(1, 2, 3))
    // will emit those three elements, but will never complete:
    val wontComplete = completes ++ Observable.never