I have reactive stream, but in CLI app, that one terminates immediately, and I cannot find any acceptable solution.
Lets start with easy example. CLI app, we create "long" running daemon thread to simulate similar situation as I experience with reactive streams:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
System.out.println("Start.");
Thread.sleep(10000);
System.out.println("Well done!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.setDaemon(true);
thread.start();
}
if we run this, the app terminates immediately, as there is no non-daemon thread running when the main one is finishing. This example is pretty easy to fix:
public static void main(String[] args) {
Thread thread = new Thread(() -> {
try {
System.out.println("Start.");
Thread.sleep(10000);
System.out.println("Well done!");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
thread.setDaemon(true);
thread.start();
try {
thread.join();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
Now it's "well done", at least that's what gets printed. Now lets move to reactive streams. Following terminates immediately:
public static void main(String[] args) {
Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.subscribe(System.out::println);
}
Ok, I can "fix" this like this:
public static void main(String[] args) {
Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.doOnNext(System.out::println)
.blockLast();
}
}
but don't want to use side-effect method doOnNext
or blockLast
, it does not seem good to me, and I don't have now possibility to use errorConsumer
and completeConsumer
available in subscribe
method. I've seen following being recommended:
public static void main(String[] args) {
Disposable disposable = Flux.range(1, 10)
.zipWith(Flux.interval(Duration.ofSeconds(1)))
.map(Tuple2::getT1)
.subscribe(System.out::println);
while (!disposable.isDisposed()) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
and while working with disposable is fine, active waiting is not.
What is the correct solution here to use subscribe and somehow join to "reactive threads"? Or And if subscribe
ins't the way to go, and I can somehow reformulate it (with all 3 consumers) differently, then how?
What you are describing is that you don't want to use the blocking mechanism of reactor but instead implement a custom blocking mechanism that will do virtually the same thing.
Just use blockLast()
, it's totally valid for this situation.
However, if your application is more complex than your example, just make sure that you really only block once and on that main thread!