Search code examples
hazelcastvert.xevent-bus

How to setup a distributed eventbus in a vertx Hazelcast Cluster?


Here is the sender verticle I have set multicast enabled and set the public host to my machines ip address

VertxOptions options = new VertxOptions()
                .setClusterManager(ClusterManagerConfig.getClusterManager());
       EventBusOptions eventBusOptions = new EventBusOptions()
               .setClustered(true)
               .setClusterPublicHost("10.10.1.160");

        options.setEventBusOptions(eventBusOptions);



        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                Vertx vertx = res.result();
                vertx.deployVerticle(new requestHandler());
                vertx.deployVerticle(new requestSender());
             
                EventBus eventBus = vertx.eventBus();
                eventBus.send("some.address","hello",reply -> {
                    System.out.println(reply.toString());
                   
                });


            } else {
                LOGGER.info("Failed: " + res.cause());
            }
        });
    }

here's the reciever verticle


VertxOptions options = new VertxOptions().setClusterManager(mgr);
        options.setEventBusOptions(new EventBusOptions()
                .setClustered(true)
                .setClusterPublicHost("10.10.1.174") );



        Vertx.clusteredVertx(options, res -> {
            if (res.succeeded()) {
                Vertx vertx1 = res.result();

                System.out.println("Success");
                EventBus eb = vertx1.eventBus();
                System.out.println("ready");
                eb.consumer("some.address", message -> {
                  
                    message.reply("hello hello");
                });
            } else {
                System.out.println("Failed");
            }
        });

I get this result when i run both main verticles , so the verticles are detected by hazelcast and a connection is established

INFO: [10.10.1.160]:33001 [dev] [3.10.5] Established socket connection between /10.10.1.160:33001 and /10.10.1.174:35725
Jan 11, 2021 11:45:10 AM com.hazelcast.internal.cluster.ClusterService
INFO: [10.10.1.160]:33001 [dev] [3.10.5] 

Members {size:2, ver:2} [
    Member [10.10.1.160]:33001 - 51b8c249-6b3c-4ca8-a238-c651845629d8 this
    Member [10.10.1.174]:33001 - 1cba1680-025e-469f-bad6-884111313672
]

Jan 11, 2021 11:45:10 AM com.hazelcast.internal.partition.impl.MigrationManager
INFO: [10.10.1.160]:33001 [dev] [3.10.5] Re-partitioning cluster data... Migration queue size: 271
Jan 11, 2021 11:45:11 AM com.hazelcast.nio.tcp.TcpIpAcceptor

But when the event-bus tries to send a message to given address i encounter this error is this a problem with event-bus configuration?

Jan 11, 2021 11:59:57 AM io.vertx.core.eventbus.impl.clustered.ConnectionHolder
WARNING: Connecting to server 10.10.1.174:39561 failed
io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /10.10.1.174:39561
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
    at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:327)
    at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:340)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:665)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:612)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:529)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:491)
    at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:905)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.ConnectException: Connection refused
    ... 11 more

Solution

  • In Vert.x 3, the cluster host and cluster public host default to localhost.

    If you only change the cluster public host in VertxOptions, Vert.x will bind EventBus transport servers to localhost while telling other nodes to connect to the public host.

    This kind of configuration is needed when running Vert.x on some cloud providers, but in most cases you only need to set the cluster host (and then the public host will default to its value):

    EventBusOptions eventBusOptions = new EventBusOptions()
      .setClustered(true)
      .setHost("10.10.1.160");