I have created an integration flow to read files from a SFTP server and process them. I realized that once there is an error with one of the files (an exception is thrown), the poll stops and any other file is not processed until the next poll. How can I avoid this, not marking the file as processed, and processing the remaining files in that poll?
My configuration is quite simple. I am using a non-transactional poller that is triggered every minute with max-message-per-poll
of 1000. The SftpStreamingInboundChannelAdapterSpec
has a max-fetch-size
of 10 and uses a composite file list filter with a SftpRegexPatternFileListFilter
and a SftpPersistentAcceptOnceFileListFilter
.
@Bean
public IntegrationFlow sftpInboundFlow(JdbcMetadataStore jdbcMetadataStore, DataSourceTransactionManager dataSourceTransactionManager) {
return IntegrationFlows.from(sftpStreamingInboundChannelAdapterSpec(jdbcMetadataStore), sourcePollingChannelAdapterSpec -> configureEndpoint(sourcePollingChannelAdapterSpec, dataSourceTransactionManager))
.transform(new StreamTransformer())
.channel("processingChannel")
.get();
}
private SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec(JdbcMetadataStore jdbcMetadataStore) {
SftpStreamingInboundChannelAdapterSpec sftpStreamingInboundChannelAdapterSpec = Sftp.inboundStreamingAdapter(documentEnrollementSftpRemoteFileTemplate())
.filter(fileListFilter(jdbcMetadataStore))
.maxFetchSize(10)
.remoteDirectory("/the-directory");
SftpStreamingMessageSource sftpStreamingMessageSource = sftpStreamingInboundChannelAdapterSpec.get();
sftpStreamingMessageSource.setFileInfoJson(false);
return sftpStreamingInboundChannelAdapterSpec;
}
private void configureEndpoint(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec, DataSourceTransactionManager dataSourceTransactionManager) {
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000);
sourcePollingChannelAdapterSpec.autoStartup(true)
.poller(pollerSpec);
}
@Bean
public CompositeFileListFilter<ChannelSftp.LsEntry> fileListFilter(JdbcMetadataStore jdbcMetadataStore) {
String fileNameRegex = // get regex
SftpRegexPatternFileListFilter sftpRegexPatternFileListFilter = new SftpRegexPatternFileListFilter(fileNameRegex);
SftpPersistentAcceptOnceFileListFilter sftpPersistentAcceptOnceFileListFilter = new SftpPersistentAcceptOnceFileListFilter(jdbcMetadataStore, "");
CompositeFileListFilter<ChannelSftp.LsEntry> compositeFileListFilter = new CompositeFileListFilter<>();
compositeFileListFilter.addFilter(sftpRegexPatternFileListFilter);
compositeFileListFilter.addFilter(sftpPersistentAcceptOnceFileListFilter);
return compositeFileListFilter;
}
After reading this answer, I tried using a transactional poller as follows:
PollerSpec pollerSpec = Pollers.cron(sftpProperties.getPollCronExpression())
.maxMessagesPerPoll(1000)
.transactional(dataSourceTransactionManager);
but the result is that after the processing of a file fails, the poll stops, all processed messages are rolled back, and remaining messages are not processed until the next poll. What I understood from that answer was that every message would be processed in a separate transaction.
The only way I found to achieve this so far was to surround the processing code in a try/catch block catching all exceptions to avoid interrupting the poll. In the catch block I manually remove the ChannelSftp.LsEntry
from the composite file list filter. For this I needed to set the property fileInfoJson
to false
in the SftpStreamingMessageSource
provided by the SftpStreamingInboundChannelAdapterSpec
.
I find this approach rather convoluted and with the downside that files that fail and are removed from the filter are immediately reprocessed afterwards and not in the following poll.I was hoping there is a more straightforward solution to my problem.
The solution with the try...catch is the way to go. This is really the fact that exception thrown from the process is bubbled into the poller and it stops the current cycle around maxMessagesPerPoll
:
private Runnable createPoller() {
return () ->
this.taskExecutor.execute(() -> {
int count = 0;
while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
if (pollForMessage() == null) {
break;
}
count++;
}
});
}
Where that pollForMessage()
is like this:
private Message<?> pollForMessage() {
try {
return this.pollingTask.call();
}
catch (Exception e) {
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
else {
Message<?> failedMessage = null;
if (this.transactionSynchronizationFactory != null) {
Object resource = TransactionSynchronizationManager.getResource(getResourceToBind());
if (resource instanceof IntegrationResourceHolder) {
failedMessage = ((IntegrationResourceHolder) resource).getMessage();
}
}
throw new MessagingException(failedMessage, e); // NOSONAR (null failedMessage)
}
}
finally {
if (this.transactionSynchronizationFactory != null) {
Object resource = getResourceToBind();
if (TransactionSynchronizationManager.hasResource(resource)) {
TransactionSynchronizationManager.unbindResource(resource);
}
}
}
}
Anyway there is still a way to isolate one message from others in the single polling cycle. For this purpose you need to take a look into the Request Handler Advice Chain and investigate a solution with the ExpressionEvaluatingRequestHandlerAdvice
: https://docs.spring.io/spring-integration/docs/current/reference/html/#message-handler-advice-chain
So, you add this into your handler endpoint downstream and catch exceptions over there and do some specific error handling not re-throwing exceptions to poller.