I am trying to configure the following flow: try to acquire a lock when a message arrives on a Rabbit queue, query a remote file server for some files and send a new message to another queue for each file found and release the lock after sending all the files.
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.filter(m -> lockService.acquire())
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.aggregate()
.handle(m -> {
log.info("Releasing lock");
lock.release();
})
.get();
The problem is that the flow stops after the first .handle
method (honestly, as expected) and I cannot figure out how to configure it to do what I want? I tried using .wireTap
and .publishSubscribeChannel
but that makes 2 flows that are not dependent of each other and my lock gets released before the files are actually sent.
It would be great if someone could help me explain how to fix it using DSL because I am creating these flows dynamically...
My attempt at setting the interceptor on the channel:
final DirectChannel channel = new DirectChannel();
channel.setInterceptors(Collections.singletonList(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lock.release();
}
}));
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(channel)
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get();
But this way the lock is acquired, released and only then the messages are fetched. What am I doing wrong?
Figured it out from help in Gitter chat, in case someone else gets stuck:
IntegrationFlows.from(Amqp.inboundGateway(container)
.messageConverter(messageConverter)
)
.channel(MessageChannels.direct().interceptor(new ChannelInterceptor() {
@Override
public Message<?> preSend(final Message<?> message, final MessageChannel channel) {
lockService.acquire();
return message;
}
@Override
public void afterSendCompletion(final Message<?> message, final MessageChannel channel, final boolean sent, final Exception ex) {
lockService.release();
}
}))
.transform(m -> remoteFileTemplate.list(inputDirectory))
.split()
.handle(Amqp.outboundAdapter(amqpTemplate)
.exchangeName("")
.routingKey(routingKey)
.get());
pub/sub after the split, with the AMQP handler on one subflow, and the aggregator on the other should work fine.
Each will be called consecutively on the same thread, with the final message causing the release from the aggregator, again on that same thread.
Having said that, you will need some errorChannel processing on the inbound gateway to release the lock in the event that an error occurs.
EDIT
A less complex solution would be a custom ChannelInterceptor
on the channel before the transform instead of the filter, to lock the lock in preSend()
and release it in afterSendCompleted()
(which is called for both success and failure).