Search code examples
javaspringspring-bootspring-integration

Spring Integration FileReadingMessageSource use UseWatchService


I use FileReadingMessageSource as source for Message in Spring Integration ESB.

In particular I would like to use the WatchServiceDirectoryScanner. I use the Annotation based configuration with spring boot.

If I do not use WatchServiceDirectoryScanner everything is fine but when I set setUseWatchService(true) I can't get it to work.

Only the file present in the directory when the application starts are generate new messages in the channel. The files copied or created in the directory do not generate any message.

I would like to use WatchServiceDirectoryScanner since I want to generate a message only

This is the code where I configure the channel the adapter and the source:

    @Bean(name="source")
    public FileReadingMessageSource getFileMessageSource(){
        FileReadingMessageSource lm = new FileReadingMessageSource();
        lm.setBeanName("fileMessageSource");
        lm.setDirectory(new File("C:/DestDir"));
        lm.setAutoCreateDirectory(true);

        lm.setFilter(new AcceptAllFileListFilter<>());
        lm.setUseWatchService(true);
        lm.setWatchEvents(WatchEventType.CREATE);
        return lm;
    }

@Bean(name="channel")
    public PublishSubscribeChannel getFileChannel(){
        PublishSubscribeChannel psc = new PublishSubscribeChannel();
        psc.setLoggingEnabled(true);
        psc.setComponentName("channelexp");
        return psc;

    }


    @Bean
    @DependsOn({"source","channel"})
    public SourcePollingChannelAdapter  getChannelAdapter(){
        SourcePollingChannelAdapter spca = new SourcePollingChannelAdapter();
        FileReadingMessageSource frms = context.getBean(FileReadingMessageSource.class);
        PublishSubscribeChannel psc = context.getBean("channel",PublishSubscribeChannel.class);
        spca.setSource(frms);
        spca.setOutputChannel(psc);
        return spca;
    }

Even If I use @InboundChannelAdapter there is no difference

@Bean(name="source")
    @InboundChannelAdapter(value = "channel", poller = @Poller(fixedDelay = "1000"))
    public FileReadingMessageSource getFileMessageSource(){
        FileReadingMessageSource lm = new FileReadingMessageSource();
        lm.setDirectory(new File("C:/fromDir/"));
        lm.setFilter(new AcceptAllFileListFilter<>());
        lm.setUseWatchService(true);
        lm.setWatchEvents(WatchEventType.CREATE);
        return lm;
    }

Where I'm doing wrong?


Solution

  • I don't see polling code in your configuration, but I wonder if you have anything else there:

    1. You have to use @EnableIntegration
    2. The SourcePollingChannelAdapter should be supplied with the PollerMetadata
    3. Or just consider to use @InboundChannelAdapter (See Reference Manual)
    4. It is really looks odd to use context.getBean() during bean definition. You should use injection there. The former is for runtime and not for initialization phase.

    EDIT

    Spring Integration Java DSL sample:

    @SpringBootApplication
    public class FileChangeLineSeparator {
    
        public static void main(String[] args) throws Exception {
            ConfigurableApplicationContext context = new SpringApplicationBuilder(FileChangeLineSeparator.class)
                    .web(false)
                    .run(args);
            System.out.println("Put a windows file with a .txt extension in /tmp/in,"
                    + "\nthe file will be converted to Un*x and placed in"
                    + "\n/tmp/out"
                    + "\n\nHit enter to terminate");
            System.in.read();
            context.close();
        }
    
    @Bean
    @InboundChannelAdapter(value = "channel", poller = @Poller(fixedDelay = "1000"))
    public FileReadingMessageSource getFileMessageSource() {
        FileReadingMessageSource lm = new FileReadingMessageSource();
        lm.setDirectory(new File("/tmp/in"));
        lm.setFilter(new AcceptAllFileListFilter<>());
        lm.setUseWatchService(true);
        lm.setWatchEvents(FileReadingMessageSource.WatchEventType.CREATE);
        return lm;
    }
    
    @Bean
    public IntegrationFlow fileToFile() {
        return IntegrationFlows.from("channel")                 
                    .transform(Transformers.fileToString())
                    .transform("payload.replaceAll('\r\n', '\n')")
                    .handle(Files.outboundAdapter("'/tmp/out'")
                            .autoCreateDirectory(true))
                    .get();
        }
    
    }
    

    I have created new files eventually and they are picked up properly and processed.