Cannot multithread on an inbound SFTP Spring integration. Want the service activator to activate on a separate thread as soon as the poller picks a file . With the below code, the serviceactivator comes into picture only after all the files are processed by the messageSource. If the input sftp director has more than 2gb files its taking so long to reach the service activator. Did I miss anything ?
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory();
factory.setHost(sftpHost);
factory.setPort(sftpPort);
factory.setAllowUnknownKeys(true);
factory.setUser(sftpUser);
factory.setPassword(sftpPassword);
factory.setAllowUnknownKeys(true);
return factory;
}
@Bean(name="defaultsync")
public SftpInboundFileSynchronizer synchronizer(){
SftpInboundFileSynchronizer sync = new SftpInboundFileSynchronizer(createNewSFTPSessionFactory());
sync.setDeleteRemoteFiles(false);
sync.setRemoteDirectory(sftpDirectory);
sync.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return sync;
}
@Bean(name="sftpMessageSource")
@InboundChannelAdapter(channel="fileuploaded", poller = @Poller(fixedDelay = "3000"))
public MessageSource<File> sftpMessageSource(){
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(synchronizer());
source.setLocalDirectory(new File("C:\\Users\\Administrator\\Documents\\myfolder"));
source.setAutoCreateLocalDirectory(true);
source.setMaxFetchSize(-1);
return source;
}
@ServiceActivator(inputChannel = "fileuploaded")
public void handleIncomingFile(File file) throws IOException {
log.info(String.format("handleIncomingFile BEGIN %s", file.getName()));
String content = FileUtils.readFileToString(file, "UTF-8");
log.info(String.format("Content: %s", content));
if(awsFileService.putFileInAWSS3(file))
System.out.println("Processed file : "+file.getName());
}
How can I call the ServiceActivator as soon as the file is received by the messageSource ? Any pointers would be appreciated.
Update
There were 3 problems : 1.) Because the fetch size was set to negative number, it will call the serviceactivator only after all of the files in input channel are processed.
2.) Absence of filter causing same file to repeatedly get processed .Had to add filter.
3.) Because of the datasize and the number of files to be processed , the SimpleMetadatastore approach was failing , so had to use jdbc store in the filter.
Got the below error :
Caused by: org.springframework.jdbc.UncategorizedSQLException: PreparedStatementCallback; uncategorized SQLException for SQL [INSERT INTO INT_METADATA_STORE(METADATA_KEY, METADATA_VALUE, REGION) SELECT ?, ?, ? FROM INT_METADATA_STORE WHERE METADATA_KEY=? AND REGION=? HAVING COUNT(*)=0]; SQL state [S0002]; error code [208]; Invalid object name 'INT_METADATA_STORE'
Problem resolved by adding the below code and by adding the default spring datasource
@Autowired
DataSource dataSource;
@Bean(name="defaultsync")
public SftpInboundFileSynchronizer synchronizer(){
SftpInboundFileSynchronizer sync = new SftpInboundFileSynchronizer(createNewSFTPSessionFactory());
sync.setDeleteRemoteFiles(false);
sync.setRemoteDirectory(sftpDirectory);
// sync.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
sync.setFilter(getFilter());
return sync;
}
public ChainFileListFilter<DirEntry> getFilter()
{
ChainFileListFilter<DirEntry> chainFilter = new ChainFileListFilter<>();
chainFilter.addFilter(new SftpRegexPatternFileListFilter(".*\\.(xml|XML)"));
chainFilter.addFilter(new SftpPersistentAcceptOnceFileListFilter(getMetadataStore(),"MyPrefix"));
return chainFilter;
}
@Bean
public ConcurrentMetadataStore getMetadataStore()
{
return new JdbcMetadataStore(dataSource);
}
Also had to run the script for my database from the following link :
Now the files are getting processed.
source.setMaxFetchSize(-1);
This means fetch all files that pass the filter first; source.setMaxFetchSize(1);
will fetch files one-at-a-time.