Search code examples
javaspringspring-bootjava-threadsthread-synchronization

Sleeping in Spring Boot


I have a task scheduled (@EnableSceduling) in a Spring Boot web service that repeats on a regular basis. When that task fires, it calls the registered object's Runnable/run method. In that run method, I need to do work and not exit that run method until the work is completed. The problem is that I have other threads doing other work that is needed by this run thread for its work. So in the run thread I have something like this:

@Component
public class DoWork implements Runnable {
    
    @override
    public void run() {
    
    // Setup clients.

    // Call services.
    Mono<String> response1 = client1.post();
    response1.subscribe(new MyResponseCallback(), new MyErrorCallback()); 
    
    Mono<String> response2 = client2.post();
    response2.subscribe(new MyResponseCallback(), new MyErrorCallback()); 

    Mono<String> responseX = clientX.post();
    responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
        
    while(callbacksWorkCompletedFlag == false) {
        
            Thread.sleep (1000);
        }

        // Do computation with callback responses.

        // After computation is completed, exit run method.
    }
}

public class MyResponseCallback implements Consumer<String> {
    
    @override
    public void accept (final Sting response) {
    
        // Do work with response.
    }
}

public class MyErrorCallback implements Consumer<Throwable> {
    
    @override
    public void accept (final Throwable error) {
    
        // Log error.
    }
}

Is there a better way to do this in Java/Spring boot?


Solution

  • Here is an example using CompletableFuture. It uses the third parameter for Mono.subscribe to let the future know when it's done.

    @Override
    public void run() {
        Mono<String> response1 = client1.post();
        CompletableFuture<?> future1 = new CompletableFuture<>();
        response1.subscribe(
                    new MyResponseCallback(), new MyErrorCallback(),
                    () -> future1.complete(null));
    
        Mono<String> response2 = client2.post();
        CompletableFuture<?> future2 = new CompletableFuture<>();
        response2.subscribe(
                    new MyResponseCallback(), new MyErrorCallback(),
                    () -> future2.complete(null));
    
        Mono<String> responseX = clientX.post();
        CompletableFuture<?> futureX = new CompletableFuture<>();
        responseX.subscribe(
                    new MyResponseCallback(), new MyErrorCallback(),
                    () -> futureX.complete(null));
        
        CompletableFuture.allOf(future1, future2, futureX).join();
    }
    

    Here is a CountDownLatch example:

    @Override
    public void run() {
        CountDownLatch latch = new CountDownLatch(3);
    
        Mono<String> response1 = client1.post();
        response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
    
        Mono<String> response2 = client2.post();
        response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
    
        Mono<String> responseX = clientX.post();
        responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
        
        try {
            latch.await();
        } catch (InterruptedException ex) {}
    }
    

    Another CompletableFuture example:

    @Override
    public void run() {
        List<CompletableFuture<?>> futures = new ArrayList<>();
        Supplier<Runnable> onDone = () -> {
            CompletableFuture<?> future = new CompletableFuture<>();
            futures.add(future);
            return () -> future.complete(null);
        };
    
        Mono<String> response1 = client1.post();
        response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
    
        Mono<String> response2 = client2.post();
        response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
    
        Mono<String> responseX = clientX.post();
        responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
    
        CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
    }
    

    Are all the callbacks actually required?

    @Override
    public void run() {
        // Make requests
        Mono<String> responseMono1 = client1.post();
        Mono<String> responseMono2 = client2.post();
        Mono<String> responseMonoX = clientX.post();
        try {
            // Wait for requests to complete
            String response1 = responseMono1.block();
            String response2 = responseMono2.block();
            String responseX = responseMonoX.block();
    
            ...
        }
        catch (RuntimeException e) {
            ...
        }
    }