Search code examples
scalaobserver-patternobservablerx-java

Add elements after creation of rx Observable


How can I implement scenario when I want to add elements after creation of the Observable, can it be done at all? In the Observer pattern I would just fire event or so. Do you have some ideas?

import rx.lang.scala._

val target  = Observable(1,2,3,4)

val subscription1 = target subscribe(println(_))
val subscription2 = target subscribe(println(_))


def addToObservable(toAdd: Int, target: Observable[Int]): Observable[Int] = {
  target/*.addElementAndNotifyObservers(toAdd)*/
}

addToObservable(4, target) //should print 4 on all subscriptions
addToObservable(6, target) //should print 6 on all subscriptions

Solution

  • You can't - not to the observable you created. What you need is a Subject, using which you can emit values. Subject is basically both an Observable and an Observer.

    For example:

    import rx.lang.scala._
    import rx.lang.scala.subjects._
    
    val subject = ReplaySubject[Int]()             
    val initial = Observable(1,2,3,4)     
    val target = initial ++ subject   // concat the observables
    
    val subscription1 = target subscribe(println(_))
    val subscription2 = target subscribe(println(_))
    
    subject.onNext(4)    // emit '4'
    subject.onNext(6)    // emit '6'