I have a task scheduled (@EnableSceduling) in a Spring Boot web service that repeats on a regular basis. When that task fires, it calls the registered object's Runnable/run method. In that run method, I need to do work and not exit that run method until the work is completed. The problem is that I have other threads doing other work that is needed by this run thread for its work. So in the run thread I have something like this:
@Component
public class DoWork implements Runnable {
@override
public void run() {
// Setup clients.
// Call services.
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback());
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback());
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback());
while(callbacksWorkCompletedFlag == false) {
Thread.sleep (1000);
}
// Do computation with callback responses.
// After computation is completed, exit run method.
}
}
public class MyResponseCallback implements Consumer<String> {
@override
public void accept (final Sting response) {
// Do work with response.
}
}
public class MyErrorCallback implements Consumer<Throwable> {
@override
public void accept (final Throwable error) {
// Log error.
}
}
Is there a better way to do this in Java/Spring boot?
Here is an example using CompletableFuture
. It uses the third parameter for Mono.subscribe
to let the future know when it's done.
@Override
public void run() {
Mono<String> response1 = client1.post();
CompletableFuture<?> future1 = new CompletableFuture<>();
response1.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> future1.complete(null));
Mono<String> response2 = client2.post();
CompletableFuture<?> future2 = new CompletableFuture<>();
response2.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> future2.complete(null));
Mono<String> responseX = clientX.post();
CompletableFuture<?> futureX = new CompletableFuture<>();
responseX.subscribe(
new MyResponseCallback(), new MyErrorCallback(),
() -> futureX.complete(null));
CompletableFuture.allOf(future1, future2, futureX).join();
}
Here is a CountDownLatch
example:
@Override
public void run() {
CountDownLatch latch = new CountDownLatch(3);
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), latch::countDown);
try {
latch.await();
} catch (InterruptedException ex) {}
}
Another CompletableFuture
example:
@Override
public void run() {
List<CompletableFuture<?>> futures = new ArrayList<>();
Supplier<Runnable> onDone = () -> {
CompletableFuture<?> future = new CompletableFuture<>();
futures.add(future);
return () -> future.complete(null);
};
Mono<String> response1 = client1.post();
response1.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
Mono<String> response2 = client2.post();
response2.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
Mono<String> responseX = clientX.post();
responseX.subscribe(new MyResponseCallback(), new MyErrorCallback(), onDone.get());
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new)).join();
}
Are all the callbacks actually required?
@Override
public void run() {
// Make requests
Mono<String> responseMono1 = client1.post();
Mono<String> responseMono2 = client2.post();
Mono<String> responseMonoX = clientX.post();
try {
// Wait for requests to complete
String response1 = responseMono1.block();
String response2 = responseMono2.block();
String responseX = responseMonoX.block();
...
}
catch (RuntimeException e) {
...
}
}