Search code examples
graphqlrx-javarx-java2

Wiring an Observable to a SingleInterop in Vert.x


I'm having trouble figuring out how to correctly wire in an Observables onNext() call with a SingleInterop. I'm also not sure how to return the expected result without blocking the main thread. My rough attempt is shown below. Any help would be appreciated.

public class SomeClass {
  private ExecutorService executor = Executors.newSingleThreadExecutor;
  private ObservableOnSubscribe<MyCustomObj> disHandler;

  public SomeClass() {
    init();
  }

  /** Create an observable to listen to a data input stream **/
  private void init() {
    disHandler = emitter ->
      executor.submit(() -> {
        try {
          while (true) { 
            MyCustomObj mco = readFromDataInputStream();
            emitter.onNext(mco);
          }
          // Should never complete since always listening...
          emitter.onComplete();
        }
        catch(Exception e) {
          emitter.onError(e);
        }
      });
  }
  public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
    // Eventually the disHandler will send an onNext() as 
    // a result of doSomethingToBackend() call.

    Observable<String> observer = Observable.fromCallable(doSomethingToBackend(someObj));
    observer.subscribe(item -> item);

    return ???.to(SingleInterop.get());
  }
}

public class GraphQLVertx {

  public VertxDataFetcher<Single<String>> dataFetcherTest() {
    return new VertxDataFetcher<>((env, future) -> {
      try {
        future.complete(someClass.invokeSomethingAndGetResponseFromHandler("Blah");
      }
      catch(Exception e) {
        future.fail();
      }
    });
  }
}

Solution

  • The observable has a helper method that returns the first emitted element as a single, or an error, so if you already have an Observable<String> you can transform it to Single<String> like this:

    public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
        return Observable.fromCallable(doSomethingToBackend(someObj))
            .singleOrError();
      }