Search code examples
javamultithreadingvert.xvertx-verticle

Why is Vert.x worker verticle called from multiple threads concurrently?


my vertx (4.0.2) application written in Java (11) uses some data-heavy verticles that cause latency spikes because the eventloop gets blocked by them for a moment. For this reason i wanted to deploy these verticles as worker verticles, so that the eventloop and other verticles are no longer blocked.

Unfortunately my application crashes now, because the event handling inside the verticle is executed by multiple threads concurrently ;(

If i understand the vertx documentation correctly, this should not happen:

Worker verticle instances are never executed concurrently by Vert.x by more than one thread, but can executed by different threads at different times.

I was able to reproduce the issue with a minimal example:

@Slf4j
public class WorkerTest extends AbstractVerticle {
  private static final String ADDRESS = "address";
  private volatile String currentThread = null;
  private long counter = 0;

  @Override
  public void start(final Promise<Void> startPromise) {
    vertx.eventBus().consumer(ADDRESS, this::handleMessage);
    startPromise.complete();
  }

  private void handleMessage(Message<Object> message) {
    final var _currentThread = this.currentThread;
    final var thisThread = Thread.currentThread().getName();

    if (_currentThread != null) {
      log.error(
          "concurrent callback: current thread={}, this thread={}", _currentThread, thisThread);
      return;
    }

    try {
      this.currentThread = thisThread;
      Thread.sleep(2);
      if (++counter % 100L == 0) {
        log.info("received {} messages (current thread: {})", counter, thisThread);
      }
    } catch (Exception e) {
    } finally {
      this.currentThread = null;
    }
  }

  public static void main(String[] args) {
    final Vertx vertx = Vertx.vertx();

    vertx.deployVerticle(
        new WorkerTest(),
        new DeploymentOptions().setWorker(true),
        result -> {
          if (result.failed()) {
            System.exit(1);
            return;
          }

          for (int i = 0; i < 1000; ++i) {
            vertx.eventBus().send(ADDRESS, "test");
          }
        });
  }
}

Executing this gives me many log errors because handleMessage is called from multiple threads concurrently. If i deploy the verticle as non-worker, this works as intended.

What am i doing wrong here?


Solution

  • vertx 4.0.2 seems to be the problem in your case. using vertx 4.0.3 and following code:

    
    public class WorkerTest extends AbstractVerticle {
        private static final String ADDRESS = "address";
    
        private volatile boolean handleMessageInExecution = false;
    
        public static void main(String[] args) {
            final Vertx vertx = Vertx.vertx();
    
            vertx.deployVerticle(
                    WorkerTest::new,
                    new DeploymentOptions()
                            .setInstances(2)
                            .setWorkerPoolSize(10)
                            .setWorker(true)
                    ,
                    result -> {
                        for (int i = 0; i < 100; ++i) {
                            vertx.eventBus().send(ADDRESS, "test " + i);
                        }
                    });
        }
    
        @Override
        public void start(final Promise<Void> startPromise) {
            vertx.eventBus().localConsumer(ADDRESS, this::handleMessage);
            startPromise.complete();
        }
    
        private void handleMessage(Message<String> message) {
            if (handleMessageInExecution) {
                // this should never happen, since each thread that sets this to true, will also set it to
                // false on exit.
                System.out.println(message.body() + " ERROR");
                return;
            }
    
            handleMessageInExecution = true; // this thread is now executing handleMessage
            System.out.println(message.body() + " START   " + Thread.currentThread());
    
            try {
                Thread.sleep(1); // block thread for a moment to simulate heavy load
            } catch (Exception e) {
                // ignore interruption
                e.printStackTrace();
            } finally {
                handleMessageInExecution = false; // we are done executing
                System.out.println(message.body() + " END     " + Thread.currentThread());
            }
        }
    }
    

    we see this output, which is the expected (each message is handled by one thread and it runs from start to end without concurrency, max 2 messages at same time as we have 2 instances):

    test 1 START   Thread[vert.x-worker-thread-2,5,main]
    test 0 START   Thread[vert.x-worker-thread-3,5,main]
    test 0 END     Thread[vert.x-worker-thread-3,5,main]
    test 1 END     Thread[vert.x-worker-thread-2,5,main]
    test 2 START   Thread[vert.x-worker-thread-3,5,main]
    test 3 START   Thread[vert.x-worker-thread-2,5,main]
    test 3 END     Thread[vert.x-worker-thread-2,5,main]
    test 2 END     Thread[vert.x-worker-thread-3,5,main]
    test 5 START   Thread[vert.x-worker-thread-2,5,main]
    test 4 START   Thread[vert.x-worker-thread-3,5,main]
    test 4 END     Thread[vert.x-worker-thread-3,5,main]
    test 6 START   Thread[vert.x-worker-thread-3,5,main]
    test 5 END     Thread[vert.x-worker-thread-2,5,main]
    test 7 START   Thread[vert.x-worker-thread-2,5,main]
    test 6 END     Thread[vert.x-worker-thread-3,5,main]
    test 8 START   Thread[vert.x-worker-thread-3,5,main]
    test 7 END     Thread[vert.x-worker-thread-2,5,main]
    test 9 START   Thread[vert.x-worker-thread-2,5,main]
    test 8 END     Thread[vert.x-worker-thread-3,5,main]
    test 10 START   Thread[vert.x-worker-thread-3,5,main]
    test 9 END     Thread[vert.x-worker-thread-2,5,main]
    test 11 START   Thread[vert.x-worker-thread-2,5,main]
    test 10 END     Thread[vert.x-worker-thread-3,5,main]
    ...