I have a Observable that connects to a service via a Socket protocol. The connection to the socket happens through a client library. The client library that I use has java.util.Observer with with I can register for events being pushed into it
final class MyObservable extends Observable[MyEvent] {
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
}
}
I have two open questions that I do not understand.
How can I get the result of Step: 3 in my Subscriber?
Every time when I get a MyEvent, with a subscriber like below, I see that there is a new connection being created. Eventually Step 1, Step 2 and Step 3 are run for each incoming event.
val myObservable = new MyObservale()
myObservable.subscribe()
Unless I'm misunderstanding your question, you just call onNext
:
def subscribe(subscriber: Subscriber[MyEvent]) = {
// connect to the Socket (Step: 1)
// get the responses that are pushed (Step: 2)
// transform them into MyEvent type (Step: 3)
// finally notify the subscriber:
subscriber.onNext(myEventFromStep3)
}
and code that subscribes would do something like:
myObservable.subscribe(onNext = println(_))