Search code examples
javagraphqlvert.xgraphql-javagraphql-subscriptions

Vert.x - GraphQL Subscriptions with DataInputStreams


I have 3rd party code that I connect to via DataInputStream. The 3rd party code continually spits out information as it generates it. When something of interest comes across I want to pass it along to GraphQL Subscription(s)

I'm not sure how to wire the 3rd party code to the server-side GraphQL subscription code given this scenario. Any suggestions would be appreciated.

Some conceptual code is below:

public void liveStream(DataInputStream in) {
  // Sit and constantly watch input stream and report when messages come in
  while(true) {
    SomeMessage message = readFromInputStream(in);
    System.out.println("Received Message Type:" + message.getType());

    // Convert SomeMessage into the appropriate class based on its type
    if (message.getType() == "foo") {
      Foo foo = convertMessageToFoo(message);
    } else if (message.getType() == "bar") {
      Bar bar = convertMessageToBar(message);
    } else if (howeverManyMoreOfThese) {
      // Keep converting to different objects
    }
  }       
}

// The client code will eventually trigger this method when 
// the GraphQL Subscription query is sent over
VertxDataFetcher<Publisher<SomeClassTBD>> myTestDataFetcher() {
  return new VertxDataFetcher<> (env, future) -> {
    try {
      future.complete(myTest());
    } catch(Exception e) {
      future.fail(e);
    }
  });
}

Solution

  • OK, I wrapped my liveStream code in an ObservableOnSubscribe using an executorService and I'm getting back all the data. I guess I can now either pass it straight through to the front end or create separate publishers to deal with specific object types and have graphql subscriptions point to their respective publishers.

    ExecutorService executor = Executors.newSingleThreadExecutor;
    
    ObservableOnSubscribe<SomeClassTBD> handler = emitter ->
      executor.submit(() -> {
        try {
          //liveStream code here
          emitter.onComplete();
        }
        catch(Exception e) {
          emitter.onError(e);
        }
        finally {
          // Cleanup here
        }
      });
      Observable<SomeClassTBD> = Observable.create(handler);