Search code examples
javafunctional-programmingvert.xpurely-functional

How to make some asynch calls in a loop


In a loop i need to make some checks, performed actually in some another verticle. In each iteration of my loop i need to check the response code, returned from those verticle and make some decision accordingly. In some other words i need to stop the execution of my loop and somehow wait till asynch. call returns. But such execution stop violates the vert.x philosophy, which states that main thread execution should be never stopped. How can i do it in the scope of Vert.x? So far i don't know how to do this. Any suggestions/code samples/urls to smth. like a solution would b highly appreciated.

Thanks!


Solution

  • When working with Vert.x you need to think less in terms of loops, and more in terms of callbacks. You should use eventBus to communicate between vertices. Let's say that what you want is something similar to this pseudocode:

    for (int i = 0; i < 4; i++) {
       int result = getVerticleResult();
       System.out.println(result);
    }
    

    So, just a very basic example

    class LooperVerticle extends AbstractVerticle {
    
        private int i = 4;
    
        @Override
        public void start() throws Exception {
            doWork();
        }
    
        private void doWork() {
            vertx.eventBus().send("channel", "", (o) -> {
                if (o.succeeded()) {
                    System.out.println(o.result().body());
                    i--;
                    if (i > 0) {
                        doWork();
                    }
                }
            });
        }
    }
    
    class WorkerVerticle extends AbstractVerticle {
    
        @Override
        public void start() throws Exception {
    
            vertx.eventBus().consumer("channel", (o) -> {
                // Generate some random number
                int num = ThreadLocalRandom.current().nextInt(0, 9);
    
                // Simulate slowness
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                o.reply(num);
            });
        }
    }
    

    To test:

    public class EventBusExample {
    
        public static void main(String[] args) {
            Vertx vertx = Vertx.vertx();
            vertx.deployVerticle(new LooperVerticle());
            vertx.deployVerticle(new WorkerVerticle());
        }
    }