I have a requirement where my application should read messages from MQ and write using file outbound channel adapter. I want each of my output file should contain messages of every 10 mins of interval. Is there any default implementation exist, or any pointers to do so.
public @Bean IntegrationFlow defaultJmsFlow()
{
return IntegrationFlows.from(
//read JMS topic
Jms.messageDrivenChannelAdapter(this.connectionFactory).destination(this.config.getInputQueueName()).errorChannel(errorChannel()).configureListenerContainer(c ->
{
final DefaultMessageListenerContainer container = c.get();
container.setSessionTransacted(true);
container.setMaxMessagesPerTask(-1);
}).get())
.channel(messageProcessingChannel()).get();
}
public @Bean MessageChannel messageProcessingChannel()
{
return MessageChannels.queue().get();
}
public @Bean IntegrationFlow messageProcessingFlow() {
return IntegrationFlows.from(messageProcessingChannel())
.handle(Files.outboundAdapter(new File(config.getWorkingDir()))
.fileNameGenerator(fileNameGenerator())
.fileExistsMode(FileExistsMode.APPEND).appendNewLine(true))
.get();
}
First of all you could use something like a QueueChannel
with the poller
on the endpoint for the FileWritingMessageHandler
with the fixedDelay
for those 10 mins. However you should keep in mind that messages are going to be stored in the memory before poller does its work. So, once a crash of your application, the messages are lost.
On the other hand you can use a JmsDestinationPollingSource
with similar poller
configuration. This way, however, you need to configure it with the maxMessagesPerPoll(-1)
to let it to pull as much messages from the MQ as possible during single polling task - once in 10 mins.
Another variant is possible with an aggregator
and its groupTimeout
option. This way you won't have an output message from the aggregator until 10 mins interval passes. However again: the store is in memory by default. I wouldn't introduce one more persistence storage just to satisfy a periodic requirement when we already have an MQ and we really can poll exactly that. Therefore I would go a JmsDestinationPollingSource
variant.
UPDATE
Can you help me with how to set fixed delay in file outbound adapter.
Since you deal with the QueueChannel
, you need to configure for the "fixed delay" a PollingConsumer
endpoint. This one really belongs to the subscriber of that channel. Indeed it is a .handle(Files.outboundAdapter)
part. Only what you are missing that Poller
is an option of the endpoint, not a MessageHandler
. Consider to use an overloaded handle()
variant:
.handle(Files.outboundAdapter(new File(config.getWorkingDir()))
.fileNameGenerator(fileNameGenerator())
.fileExistsMode(FileExistsMode.APPEND).appendNewLine(true),
e -> e.poller(p -> p.fixedDelay(10000)))
Or a sample example for
JMSDestinationPollingSource
@Bean
public IntegrationFlow jmsInboundFlow() {
return IntegrationFlows
.from(Jms.inboundAdapter(cachingConnectionFactory())
.destination("jmsInbound"),
e -> e.poller(p -> p.fixedDelay(10000)))
.<String, String>transform(String::toUpperCase)
.channel(jmsOutboundInboundReplyChannel())
.get();
}