Search code examples
scalareactive-programmingobservablesubscriberrx-scala

How to update an observable manually?


I'm newbie to reactivex and rxscala, and can create an Observable like this:

val observable = Observable[String] { subscriber => 
    subscriber.onNext("something")
}

I can put new strings to the subscriber inside the Observable.apply.

Is it possible to update the observable outside? I mean, is there any method like putNext:

observable.putNext("another string")

to make me put new things to the existing observable?


Solution

  • If you want to manually control an Observable, you need to use a Subject.

    According to the ReactiveX documentation:

    A Subject is a sort of bridge or proxy that is available in some implementations of ReactiveX that acts both as an observer and as an Observable. Because it is an observer, it can subscribe to one or more Observables, and because it is an Observable, it can pass through the items it observes by reemitting them, and it can also emit new items.

    You can subscribe to a Subject, but you can also pass new elements to it, de facto controlling it manually, which is what you ask for.

    Example:

    val subject = PublishSubject[String]()
    subject.onNext("one")
    subject.onNext("two")
    subject.onNext("three")
    subject.onCompleted()