I'm having trouble figuring out how to correctly wire in an Observables onNext() call with a SingleInterop. I'm also not sure how to return the expected result without blocking the main thread. My rough attempt is shown below. Any help would be appreciated.
public class SomeClass {
private ExecutorService executor = Executors.newSingleThreadExecutor;
private ObservableOnSubscribe<MyCustomObj> disHandler;
public SomeClass() {
init();
}
/** Create an observable to listen to a data input stream **/
private void init() {
disHandler = emitter ->
executor.submit(() -> {
try {
while (true) {
MyCustomObj mco = readFromDataInputStream();
emitter.onNext(mco);
}
// Should never complete since always listening...
emitter.onComplete();
}
catch(Exception e) {
emitter.onError(e);
}
});
}
public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
// Eventually the disHandler will send an onNext() as
// a result of doSomethingToBackend() call.
Observable<String> observer = Observable.fromCallable(doSomethingToBackend(someObj));
observer.subscribe(item -> item);
return ???.to(SingleInterop.get());
}
}
public class GraphQLVertx {
public VertxDataFetcher<Single<String>> dataFetcherTest() {
return new VertxDataFetcher<>((env, future) -> {
try {
future.complete(someClass.invokeSomethingAndGetResponseFromHandler("Blah");
}
catch(Exception e) {
future.fail();
}
});
}
}
The observable has a helper method that returns the first emitted element as a single, or an error, so if you already have an Observable<String>
you can transform it to Single<String>
like this:
public Single<String> invokeSomethingAndGetResponseFromHandler(Object someObj) {
return Observable.fromCallable(doSomethingToBackend(someObj))
.singleOrError();
}