Earlier all worked fine. But a few days ago, handling stops in the end of one flow and doesn't go further.
We successfully come to this flow:
@Bean
public IntegrationFlow collectFlow() {
return IntegrationFlows
.from(ChannelConfig.COLLECT_CHANNEL)
.log(LoggingHandler.Level.INFO, logger.getName(), m -> Utils.addDescription(m,
"Starting data collection"))
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, collectionService1, logger))
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, collectionService2, logger))
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, collectionService3, logger))
....
....
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, collectionServiceN, logger))
.log(LoggingHandler.Level.DEBUG, logger.getName(), m -> Utils.addDescription(m,
"Data collection completed"))
.channel(ChannelConfig.PROCESS_CHANNEL)
.get();
}
And doesn't go to another:
@Bean
public IntegrationFlow processFlow() {
return IntegrationFlows
.from(ChannelConfig.PROCESS_CHANNEL)
.log(LoggingHandler.Level.INFO, logger.getName(), m -> Utils.addDescription(m,
"Starting data process"))
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, processingService1, logger))
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, processingService2, logger))
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, processingService3, logger))
....
....
.<CollectionContainer>handle((p, h) ->
Utils.wrapService(p, h, processingServiceN, logger))
.log(LoggingHandler.Level.DEBUG, logger.getName(), m -> Utils.addDescription(m,
"Data process completed"))
.channel(ChannelConfig.UPDATE_CHANNEL)
.get();
In log files last line is just "Data collection completed". I turned on a debug/trace level on spring integration:
org.springframework.integration: trace
no other log line after "Data collection completed"
Channel configuration:
@Bean
public MessageChannel collectChannel() {
return MessageChannels.queue(COLLECT_CHANNEL, COLLECT_CHANNEL_QUEUE_CAPACITY).get();
}
@Bean
public MessageChannel processChannel() {
return MessageChannels.queue(PROCESS_CHANNEL, PROCESS_CHANNEL_QUEUE_CAPACITY).get();
}
How can I find out what's going on? Why flow doesn't go further?
I suggest you to upgrade to the latest Spring Integration: https://spring.io/projects/spring-integration#learn. Well, there is probably enough for you to upgrade to the latest Spring Boot instead: https://spring.io/projects/spring-boot#learn. It does pull the latest Spring Integration according to its dependency management.
I'm not sure why you need both those channel to be as a queue
, but you can try to increase a TaskScheduler
thread pool: https://docs.spring.io/spring-boot/docs/current/reference/html/features.html#features.task-execution-and-scheduling
spring.task.scheduling.pool.size=10
By default it is 1
and feels like that thread is busy with something else, e.g. polling one queue, but it does not pass to another one. In the latest Spring Integration we have fixed the receive timeout to be non-blocked by default: https://github.com/spring-projects/spring-integration/wiki/Spring-Integration-6.0-to-6.1-Migration-Guide#do-not-block-by-default
Otherwise I might would need to ask you to share with us a simple project to let us to reproduce on our side.