Search code examples

StreamingMessageSource keeps firing when a filter is applied

I am trying to poll an FTP directory for a certain kind of file, the polling of a directory works, but whenever I try to apply a filter to filter the files by extension, the messagesource keeps spamming messages about the file with no regard to the polling delay. Without the filters everything works fine, once I enable them my application authenticates with the FTP, downloads the file and sends the message nonstop over and over again. I have the following beans:

 * Factory that creates the remote connection
 * @return DefaultSftpSessionFactory
public DefaultSftpSessionFactory sftpSessionFactory(@Value("${}") String ftpHost,
                                                    @Value("${ftp.port}") int ftpPort,
                                                    @Value("${ftp.user}") String ftpUser,
                                                    @Value("${ftp.pass}") String ftpPass) {
    DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();

    return factory;

 * Template to handle remote files
 * @param sessionFactory SessionFactory bean
 * @return SftpRemoteFileTemplate
public SftpRemoteFileTemplate fileTemplate(DefaultSftpSessionFactory sessionFactory) {
    SftpRemoteFileTemplate template = new SftpRemoteFileTemplate(sessionFactory);
    return template;

 * To listen to multiple directories, declare multiples of this bean with the same inbound channel
 * @param fileTemplate FileTemplate bean
 * @return MessageSource
@InboundChannelAdapter(channel = "deeplinkAutomated", poller = @Poller(fixedDelay = "6000", maxMessagesPerPoll = "-1"))
public MessageSource inboundChannelAdapter(SftpRemoteFileTemplate fileTemplate) {
    SftpStreamingMessageSource source = new SftpStreamingMessageSource(fileTemplate);
    source.setFilter(new CompositeFileListFilter<>(
            Arrays.asList(new AcceptOnceFileListFilter<>(), new SftpSimplePatternFileListFilter("*.trg"))

    return source;

 * Listener that activates on new messages on the specified input channel
 * @return MessageHandler
@ServiceActivator(inputChannel = "deeplinkAutomated")
public MessageHandler handler(JobLauncher jobLauncher, Job deeplinkBatch) {
    return message -> {
        Gson gson = new Gson();
        SFTPFileInfo info = gson.fromJson((String) message.getHeaders().get("file_remoteFileInfo"), SFTPFileInfo.class);
        System.out.println("File to download: " + info.getFilename().replace(".trg", ".xml"));


  • I think AcceptOnceFileListFilter is not suitable for SFTP files. The returned LsEntry doesn't match previously stored in the HashSet: just their hashes are different!

    Consider to use a SftpPersistentAcceptOnceFileListFilter instead.

    Also it would be better to configure a DefaultSftpSessionFactory for the isSharedSession:

     * @param isSharedSession true if the session is to be shared.
    public DefaultSftpSessionFactory(boolean isSharedSession) {

    To avoid session recreation on each polling task.

    you don't have a 6 seconds delay between calls because you have a maxMessagesPerPoll = "-1". That means poll remote files until they are there in remote dir. In your case with the AcceptOnceFileListFilter you always end up with the same file by the hash reason.