Search code examples
project-reactorwatchservice

Why this reator code with block inside Flux.create couldn't work?


I've tried to use a watchService as a Flux generator and it couldn't work, and I also tried some simple block like Thread.sleep in the Flux.create method and it could work. I wonder why and what's the difference between these situations?

Code which could work,

    @Test
    public void createBlockSleepTest() throws InterruptedException {
        Flux.create(sink->{
            while (true) {
                try {
                    for(int i=0;i<2;i++)
                        sink.next(num++);
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).log().subscribeOn(Schedulers.parallel(),false).log()
                .subscribe(System.out::println);
        Thread.sleep(100000L);
    }

Code which couldn't work,

    @Test
    public void createBlockTest() throws IOException, InterruptedException {
        WatchService watchService = fileSystem.newWatchService();
        Path testPath = fileSystem.getPath("C:/testing");
        Files.createDirectories(testPath);
        WatchKey register = testPath.register(watchService, StandardWatchEventKinds.ENTRY_CREATE,StandardWatchEventKinds.ENTRY_MODIFY);
        Files.write(testPath.resolve("test1.txt"),"hello".getBytes());
        Thread.sleep(5000L);
        Flux.create(sink->{
            while (true) {
                try {
                    WatchKey key = watchService.take();
                    System.out.println("-----------------"+key);
                    for(WatchEvent event:key.pollEvents()){
                        sink.next(event.context());
                    }
                    Thread.sleep(5000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).log().subscribeOn(Schedulers.parallel(),false).log()
                .subscribe(System.out::println);
        Files.write(testPath.resolve("test2.txt"),"hello".getBytes());
        Thread.sleep(5000L);
        Files.write(testPath.resolve("test3.txt"),"hello".getBytes());
        Thread.sleep(10000L);
    }

I've noticed in the reactor's reference there is a notice for blocking in the create method. But why Thread.sleep works?

create doesn’t parallelize your code nor does it make it asynchronous, even though it can be used with asynchronous APIs. If you block within the create lambda, you expose yourself to deadlocks and similar side effects. Even with the use of subscribeOn, there’s the caveat that a long-blocking create lambda (such as an infinite loop calling sink.next(t)) can lock the pipeline: the requests would never be performed due to the loop starving the same thread they are supposed to run from. Use the subscribeOn(Scheduler, false) variant: requestOnSeparateThread = false will use the Scheduler thread for the create and still let data flow by performing request in the original thread.

Could anyone solve my puzzle?


Solution

  • Thread.sleep(5000L) will only block for 5s, so the create will move on after that delay, whereas WatchService#take blocks indefinitely unless a new WatchKey registers (in this case a new file). Since the code that creates files is after the create, there is a deadlock situation.