Search code examples
vert.xvertx-eventbus

Does event_bus consumer support to run Async. method?


I am new for Vert.x async programming. I have event_bus consumer, once it receives a message, then it needs to call a Async. method async_method_to_access_database to get database records from Database. async_method_to_access_database will return a Future. The code looks like following.

Here I have one questions:

Inside event_bus consumer, could it run a Async. method? I doubt whether consumer handler supports to run Async. method? In Vertx, I understand Vertilce contains event loop, it supports Async. method, I am not sure if event_bus consumer handler supports it?

EventBus eb = vertx.eventBus();

MessageConsumer<String> consumer = eb.consumer("my_address");
consumer.handler(message -> {
  
    Future<List<DataRecord>> listFuture = async_method_to_access_database();

    listFuture.onSuccess(ar->{

        doSomething();

    });

});

Solution

  • You can have async code in the handler. The important part to understand that you have two ways of sending messages on the eventbus: request/publish.

    • With publish you just fire-and-forget
    • And with request you will register a handler wich waits until the consumer answers the message by calling message.reply().

    In both cases your async code in the consumer will be executed, but when you use send, you have the option to have additional logic on the sender side, (e.g: repeat on error, validate response, etc)

    EventBus eb = vertx.eventBus();
    
    // sender
    
    eb.request("my_address","testmessage", h -> {
      if(h.succeeded()){
        System.out.println("successful access to db");
      }else{
        System.out.println(h.cause()); 
      }
    });
    
    // consumer
    
    MessageConsumer<String> consumer = eb.consumer("my_address");
    consumer.handler(message -> {
      
        Future<List<DataRecord>> listFuture = async_method_to_access_database();
    
        listFuture.onComplete(ar->{
            if (ar.succeeded()) {
                message.reply("updated elements successfully" + listFuture.size());
            }
    
            message.fail(1,"critical error") // will trigger error in sender
        });
    
    });