I've tried to use a watchService as a Flux generator and it couldn't work, and I also tried some simple block like Thread.sleep in the Flux.create method and it could work. I wonder why and what's the difference between these situations?
Code which could work,
@Test
public void createBlockSleepTest() throws InterruptedException {
Flux.create(sink->{
while (true) {
try {
for(int i=0;i<2;i++)
sink.next(num++);
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Thread.sleep(100000L);
}
Code which couldn't work,
@Test
public void createBlockTest() throws IOException, InterruptedException {
WatchService watchService = fileSystem.newWatchService();
Path testPath = fileSystem.getPath("C:/testing");
Files.createDirectories(testPath);
WatchKey register = testPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,StandardWatchEventKinds.ENTRY_MODIFY);
Files.write(testPath.resolve("test1.txt"),"hello".getBytes());
Thread.sleep(5000L);
Flux.create(sink->{
while (true) {
try {
WatchKey key = watchService.take();
System.out.println("-----------------"+key);
for(WatchEvent event:key.pollEvents()){
sink.next(event.context());
}
Thread.sleep(5000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).log().subscribeOn(Schedulers.parallel(),false).log()
.subscribe(System.out::println);
Files.write(testPath.resolve("test2.txt"),"hello".getBytes());
Thread.sleep(5000L);
Files.write(testPath.resolve("test3.txt"),"hello".getBytes());
Thread.sleep(10000L);
}
I've noticed in the reactor's reference there is a notice for blocking in the create method. But why Thread.sleep works?
create
doesn’t parallelize your code nor does it make it asynchronous, even
though it can be used with asynchronous APIs. If you block within the create
lambda,
you expose yourself to deadlocks and similar side effects. Even with the use of subscribeOn
,
there’s the caveat that a long-blocking create
lambda (such as an infinite loop calling
sink.next(t)
) can lock the pipeline: the requests would never be performed due to the
loop starving the same thread they are supposed to run from. Use the subscribeOn(Scheduler, false)
variant: requestOnSeparateThread = false
will use the Scheduler
thread for the create
and still let data flow by performing request
in the original thread.
Could anyone solve my puzzle?
Thread.sleep(5000L)
will only block for 5s, so the create
will move on after that delay, whereas WatchService#take
blocks indefinitely unless a new WatchKey
registers (in this case a new file). Since the code that creates files is after the create
, there is a deadlock situation.