Search code examples
javarx-javakotlindelegationgrpc

kotlin grpc.StreamObserver to deletate to rx.PublishSubject


When using GRPC whenever we declare streaming api

rpc heartBeat(Empty) returns (stream ServiceStatus){}

we have googles simple interface for observer pattern StreamObserver (this is what protobuf will generate for us)

public interface StreamObserver<V> {
  void onNext(V var1);

  void onError(Throwable var1);

  void onCompleted();
}

Now what you want to do is to convert this to an actual Observable and only after that pass it for further use.

override fun heartBeat(arg: Empty): Observable<ServiceStatus> {
    // we create rx java subject
    val subject = PublishSubject.create<ServiceStatus>()

    // we create grpc observer and delegate all calls to rx java
    val observer = object : StreamObserver<ServiceStatus> {

        override fun onNext(value: ServiceStatus) {
            subject.onNext(value)
        }

        override fun onError(error: Throwable) {
            subject.onError(error)
        }

        override fun onCompleted() {
            subject.onCompleted()
        }
    }

    // we use grpc observer for generated api
    asyncStub.heartBeat(arg, observer)

    // but we pass rx observable (subject) to client code
    return subject
}

now I'm new to Kotlin but i cant figure out with existing delegation functionalities is there a way to make Subject delegate of StreamObserver? Is there a more expressive way to write this piece of code in Kotlin?


Solution

  • I would create a generic method that creates a StreamObserver, passes it to its lambda argument and wraps the result in Observable.

    inline fun <T> asObservable(
        crossinline body: (StreamObserver<T>) -> Unit): Observable<T> {
      return Observable.create { subscription ->
        val observer = object : StreamObserver<T> {
          override fun onNext(value: T) {
            subscription.onNext(value)
          }
    
          override fun onError(error: Throwable) {
            subscription.onError(error)
          }
    
          override fun onCompleted() {
            subscription.onCompleted()
          }
        }
        body(observer)
      }
    }
    

    Then you can implement RPC methods in the following way.

    override fun heartBeat(arg: Empty): Observable<ServiceStatus> =
        asObservable { asyncStub.heartBeat(arg, it) }