When using GRPC whenever we declare streaming api
rpc heartBeat(Empty) returns (stream ServiceStatus){}
we have googles simple interface for observer pattern StreamObserver
(this is what protobuf will generate for us)
public interface StreamObserver<V> {
void onNext(V var1);
void onError(Throwable var1);
void onCompleted();
}
Now what you want to do is to convert this to an actual Observable
and only after that pass it for further use.
override fun heartBeat(arg: Empty): Observable<ServiceStatus> {
// we create rx java subject
val subject = PublishSubject.create<ServiceStatus>()
// we create grpc observer and delegate all calls to rx java
val observer = object : StreamObserver<ServiceStatus> {
override fun onNext(value: ServiceStatus) {
subject.onNext(value)
}
override fun onError(error: Throwable) {
subject.onError(error)
}
override fun onCompleted() {
subject.onCompleted()
}
}
// we use grpc observer for generated api
asyncStub.heartBeat(arg, observer)
// but we pass rx observable (subject) to client code
return subject
}
now I'm new to Kotlin but i cant figure out with existing delegation functionalities is there a way to make Subject delegate of StreamObserver? Is there a more expressive way to write this piece of code in Kotlin?
I would create a generic method that creates a StreamObserver
, passes it to its lambda argument and wraps the result in Observable
.
inline fun <T> asObservable(
crossinline body: (StreamObserver<T>) -> Unit): Observable<T> {
return Observable.create { subscription ->
val observer = object : StreamObserver<T> {
override fun onNext(value: T) {
subscription.onNext(value)
}
override fun onError(error: Throwable) {
subscription.onError(error)
}
override fun onCompleted() {
subscription.onCompleted()
}
}
body(observer)
}
}
Then you can implement RPC methods in the following way.
override fun heartBeat(arg: Empty): Observable<ServiceStatus> =
asObservable { asyncStub.heartBeat(arg, it) }