Search code examples
scalasystem.reactiverx-javarx-scala

Reactive Programming using RxScala


I have a Observable that connects to a service via a Socket protocol. The connection to the socket happens through a client library. The client library that I use has java.util.Observer with with I can register for events being pushed into it

final class MyObservable extends Observable[MyEvent] {

  def subscribe(subscriber: Subscriber[MyEvent]) = {
    // connect to the Socket (Step: 1)
    // get the responses that are pushed (Step: 2)
    // transform them into MyEvent type (Step: 3)
  }
}

I have two open questions that I do not understand.

How can I get the result of Step: 3 in my Subscriber?

Every time when I get a MyEvent, with a subscriber like below, I see that there is a new connection being created. Eventually Step 1, Step 2 and Step 3 are run for each incoming event.

val myObservable = new MyObservale()
myObservable.subscribe()

Solution

  • Unless I'm misunderstanding your question, you just call onNext:

    def subscribe(subscriber: Subscriber[MyEvent]) = {
      // connect to the Socket (Step: 1)
      // get the responses that are pushed (Step: 2)
      // transform them into MyEvent type (Step: 3)
    
      // finally notify the subscriber:
      subscriber.onNext(myEventFromStep3)
    }
    

    and code that subscribes would do something like:

    myObservable.subscribe(onNext = println(_))