Search code examples
javavert.xvertx-verticle

Vertx.io cluster and service discovery


I am playing with vertx.io, it looks great. Now I build up a cluster of three verticles (three simple java main fat jars). One verticle expose a web interface (a poorly rest api), the other two simply awares the web verticle that they are up or down by the vertx.io's service discovery mechanism. Here is my (relevant part) simple "non web" verticle:

public class FileReader extends AbstractVerticle {

  private ServiceDiscovery discovery;
  private Logger log = LogManager.getLogger(getClass());
  private Record record;

  @Override
  public void start(Future<Void> startFuture) throws Exception {
    record = EventBusService.createRecord(getServiceName(), getServiceAddress(), getClass());
    setUpRecord(record);
    discovery = ServiceDiscovery.create(vertx);
    discovery.publish(record, h -> {
        if (h.succeeded()) {
            log.info("Record published.");
        } else {
            log.info("Record not published.", h.cause());
        }
    });
    startFuture.complete();
  }
  ...
  @Override
  public void stop(Future<Void> stopFuture) throws Exception {
    log.info("Stopping verticle.");
    discovery.unpublish(record.getRegistration(), h -> {
        if (h.succeeded()) {
            log.info("Service unpublished.");
            stopFuture.complete();
        } else {
            log.error(h.cause());
            stopFuture.fail(h.cause());
        }
    });
  }
}

And here is how I deploy one of the two "non web" verticles:

public class FileReaderApp {

private static Logger log = LogManager.getLogger(FileReaderApp.class);
private static String id;

  public static void main(String[] args) {
    ClusterManager cMgr = new HazelcastClusterManager();
    VertxOptions vOpt = new VertxOptions(new JsonObject());
    vOpt.setClusterManager(cMgr);
    Vertx.clusteredVertx(vOpt, ch -> {
        if (ch.succeeded()) {
            log.info("Deploying file reader.");
            Vertx vertx = ch.result();
            vertx.deployVerticle(new FileReader(), h -> {
                if (h.succeeded()) {
                    id = h.result();
                } else {
                    log.error(h.cause());
                }
            });
        } else {
            log.error(ch.cause());
        }
    });

    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            log.info("Undeploying " + id);
            Vertx.vertx().undeploy(id, h -> {
                if (h.succeeded()) {
                    log.info("undeployed.");
                } else {
                    log.error(h.cause());
                }
            });
        }
    });
  }
}

When "non-web" verticles start, the "web" verticle is correctly notified. But when "non-web" verticles shutdown, I hit a keyboard Ctrl-C, I got this error and "web" verticle still think everyone is up:

2017-12-01 09:08:27 INFO  FileReader:31 - Undeploying 82a8f5c2-e6a2-4fc3-84ff-4bb095b5dc43
Exception in thread "Thread-3" java.lang.IllegalStateException: Shutdown in progress
at java.lang.ApplicationShutdownHooks.add(ApplicationShutdownHooks.java:66)
at java.lang.Runtime.addShutdownHook(Runtime.java:211)
at io.vertx.core.impl.FileResolver.setupCacheDir(FileResolver.java:310)
at io.vertx.core.impl.FileResolver.<init>(FileResolver.java:92)
at io.vertx.core.impl.VertxImpl.<init>(VertxImpl.java:185)
at io.vertx.core.impl.VertxImpl.<init>(VertxImpl.java:144)
at io.vertx.core.impl.VertxImpl.<init>(VertxImpl.java:140)
at io.vertx.core.impl.VertxFactoryImpl.vertx(VertxFactoryImpl.java:34)
at io.vertx.core.Vertx.vertx(Vertx.java:82)
at edu.foo.app.FileReaderApp$1.run(FileReaderApp.java:32)

I don't fully get what's going on. Application shutdown while it was undeploying verticle? How to solve this? What is the vertx.io approach?


Solution

  • There are two problems

    1. You should undeploy the verticle using the clustered Vert.x instance, not just any instance
    2. undeploy is a non blocking operation so the shutdown hook thread must wait for completion.

    Here's a modified version:

    Runtime.getRuntime().addShutdownHook(new Thread() {
        public void run() {
            log.info("Undeploying " + id);
            CountDownLatch latch = new CountDownLatch(1);
            theClusteredVertxInstance.undeploy(id, h -> {
                if (h.succeeded()) {
                    log.info("undeployed.");
    
                } else {
                    log.error(h.cause());
                }
                latch.countDown();
            });
            try {
                latch.await(5, TimeUnit.SECONDS);
            } catch(Exception ignored) {
            }
        }
    });