I'm using a library in a Vert.x application which returns Project Reactor type Mono.
I have a verticle which receives this reactive type and is intended to send the content through the event bus to another verticle:
import io.vertx.core.AbstractVerticle;
import io.vertx.core.eventbus.Message;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Duration;
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
@Override
public void start() throws Exception
{
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(Schedulers.fromExecutor(vertx.nettyEventLoopGroup())) // is this needed?
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
Is this right approach? Should I switch to Vert.x event loop thread pool before sending the message to the event bus? Is there anything I should be aware of when using these libraries together?
The code looks good to me, except you shouldn't use the Netty event loop group as executor but rather the verticle context:
public class HelperVerticle extends AbstractVerticle
{
public static final String ADDRESS = "address_1";
private Scheduler scheduler;
@Override
public void start() throws Exception
{
scheduler = Schedulers.fromExecutor(command -> context.runOnContext(v -> command.run()));
vertx.eventBus().consumer(ADDRESS, this::consume);
}
private void consume(Message<Object> message)
{
Mono.delay(Duration.ofMillis(3000))
.thenReturn("Content of Mono.") // this would come from external library
.publishOn(scheduler)
.subscribe(output ->
{
System.out.println("My verticle: " + Thread.currentThread().getName());
message.reply(output + " " + message.body());
}, error -> message.fail(1, error.getMessage()));
}
}
With such a scheduler, you get the insurance that the verticle state will not be modified by a thread other than the event loop it's been assigned.