Search code examples
spring-bootspring-integrationspring-integration-dsl

How to know the the running status of a spring integration flow


I have a simple integration flow that poll data based on a cron job from database, publish on a DirectChannel, then do split and transformations, and publish on another executor service channel, do some operations and finally publish to an output channel, its written using dsl style.

Also, I have an endpoint where I might receive an http request to trigger this flow, at this point I send the messages one of the mentioned channels to trigger the flow.

I want to make sure that the manual trigger doesn’t happen if the flow is already running due to either the cron job or another request.

I have used the isRunning method of the StandardIntegrationFlow, but it seems that it’s not thread safe.

I also tried using .wireTap(myService) and .handle(myService) where this service has an atomicBoolean flag but it got set per every message, which is not a solution.

I want to know if the flow is running without much intervention from my side, and if this is not supported how can I apply the atomic boolean logic on the overall flow and not on every message.

How can I simulate the racing condition in a test in order to make sure my implementation prevent this?


Solution

  • The IntegrationFlow is just a logical container for configuration phase. It does have those lifecycle methods, but only for an internal framework logic. Even if they are there, they don't help because endpoints are always running if you want to do them something by some event or input message.

    It is hard to control all of that since it is in an async state as you explain. Even if we can stop a SourcePollingChannelAdapter in the beginning of that flow to let your manual call do do something, it doesn't mean that messages in other threads are not in process any more. The AtomicBoolean cannot help here for the same reason: even if you set it to true in the MessageSourceMutator.beforeReceive() and reset back to false in its afterReceive() when message is null, it still doesn't mean that messages you pushed down in other thread are already processed.

    You might consider to use an aggregator for AtomicBoolean resetting in the end of batch since you mention that you pull data from DB, so perhaps there is a number of records per poll you can track downstream. This way your manual call could be skipped until aggregator collects results for that batch.

    You also need to think about stopping a SourcePollingChannelAdapter at the moment when manual action is permitted, so there won't be any further race conditions with the cron.