I am creating a simple integration service with AWS S3. I am facing some difficulties when an exception occurs.
My requirement is to poll an S3 Bucket periodically and to apply some transformation whenever a file is newly placed into S3 Bucket. The below code snippet works fine, but when an exception occurs it continues to retry again and again. I do not want that to happen. Can someone help me here.,
The IntegrationFlow is defined as below.,
@Configuration
public class S3Routes {
@Bean
public IntegrationFlow downloadFlow(MessageSource<InputStream> s3InboundStreamingMessageSource) {
return IntegrationFlows.from(s3InboundStreamingMessageSource)
.channel("s3Channel")
.handle("QueryServiceImpl", "processFile")
.get();
}
}
Configuration file is as below.,
@Service
public class S3AppConfiguration {
@Bean
@InboundChannelAdapter(value = "s3Channel")
public MessageSource<InputStream> s3InboundStreamingMessageSource(S3RemoteFileTemplate template) {
S3StreamingMessageSource messageSource = new S3StreamingMessageSource(template);
messageSource.setRemoteDirectory("my-bucket-name");
messageSource.setFilter(new S3PersistentAcceptOnceFileListFilter(new SimpleMetadataStore(),
"streaming"));
return messageSource;
}
@Bean
public PollableChannel s3Channel() {
return new QueueChannel();
}
@Bean
public S3RemoteFileTemplate template(AmazonS3 amazonS3) {
return new S3RemoteFileTemplate(new S3SessionFactory(amazonS3));
}
@Bean(name = "amazonS3")
public AmazonS3 nonProdAmazonS3(BasicAWSCredentials basicAWSCredentials) {
ClientConfiguration config = new ClientConfiguration();
config.setProxyHost("localhost");
config.setProxyPort(3128);
return AmazonS3ClientBuilder.standard().withRegion(Regions.fromName("ap-southeast-1"))
.withCredentials(new AWSStaticCredentialsProvider(basicAWSCredentials))
.withClientConfiguration(config)
.build();
}
@Bean
public BasicAWSCredentials basicAWSCredentials() {
return new BasicAWSCredentials("access_key", "secret_key");
}
@Bean(name = PollerMetadata.DEFAULT_POLLER)
public PollerMetadata nonProdPoller() {
return Pollers.cron("* */2 * * * *")
.get();
}
}
AcceptOnceFileList filter that I have used here, helps me to prevent handling the same file for continuous retries. But, I do not want to use AcceptOnceFileList filter, because when a file is not processed on 1st attempt, I wish to retry on next Poll (usually it happens every 1 hour in Prod region). I tried to use filter.remove() method whenever the processing fails(in case of any exception), it again results in continuous retries.
I am not sure how to disable the continuous retries on failure. Where should I configure it?
I took a look at Spring Integration ( Retry Strategy). Same scenario, but a different integration. I am not sure how to set up this for my IntegrationFlow. Can someone help here? Thanks in advance
That story is different: it talks about a listener container for AMQP. You use a source polling channel adapter - the approach might be different.
You create two source polling channel adapters: one via that @InboundChannelAdapter
, another via IntegrationFlows.from(s3InboundStreamingMessageSource)
. Both of them produces data to the same channel. Not sure if that is really intentional.
It is not clear what is that retry in your case unless you really do that manual filter.remove()
call. In this case indeed it is going to retry. But this is a single, not controlled retry. It is going to retry again only if you call that filter.remove()
again. So, if you do everything yourself, why is the question?
Consider to use a RequestHandlerRetryAdvice
configured for that your handle()
instead: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain. This way you really going to pull the remote file only once and retry is going to be managed by the Spring Retry API.
UPDATE
So, after some Cron Expression learning I realized that your one is wrong:
* */2 * * * *
- means every second of every even minute
Must be like this:
0 */2 * * * *
- at the beginning of every even minute
Perhaps something similar is with your hourly
cron expression on the prod...