Search code examples
javaspring-integration

How to correctly implement delay in Spring Integration Flow


I am trying to implement a delay in a Spring Integration Flow.
I have one flow that is starting a process on another server and then I am checking after a delay if that process is completed or not.
When completed the flow should move to the next phase.
This seems to work it also shows in logs (and, clearly, in the flow itself), a long list of repetitions in the runexampleScriptWaiting channel.

I tried removing that channel change but then the flow gets stuck in that phase forever, never moving to completion.

How can I implement this so that a single runexampleScriptWaiting is shown / executed (something similar to a non-blocking while loop, I guess)?

I considered keeping it as is and just update my monitoring application (a very small frontend that shows which channels are in the payload's history) in order to get rid of duplicated channel lines but I also wondered if there is a better / more robust way to do this.

Here's a simplified example:

@Bean
public IntegrationFlow exampleIntegrationFlow() {
    return IntegrationFlows
            .from(exampleConfig.runexampleScript.get())
            .<ExamplePayload>handle((payload, messageHeaders) -> examplePayloadService
                    .changeExampleServiceRequestStatus(payload, ExampleServiceStatus.STARTED))
            .<ExamplePayload>handle(
                    (payload, messageHeaders) -> exampleScriptService.runexample(payload))
            .channel(exampleConfig.runexampleScriptWaiting.get())
            .<ExamplePayload, Boolean>route(jobStatusService::areJobsFinished,
                    router -> router
                            .subFlowMapping(true, exampleSuccessSubflow())
                            .subFlowMapping(false, exampleWaitSubflow())
                            .defaultOutputToParentFlow()
            )
            .get();
}

@Bean
public IntegrationFlow exampleWaitSubflow() {
    return IntegrationFlows
            .from(exampleConfig.runexampleScriptWaiting.get())
            .<ExamplePayload>handle(
                    (payload, messageHeaders) -> {
                        interruptIgnoringSleep(1000);
                        return payload;
                    })
            .channel(exampleConfig.runexampleScriptWaiting.get()) // Commenting this gets the process stuck
            .get();

}

Solution

  • It is not clear what is your exampleConfig.runexampleScriptWaiting.get(), but what you have so far in the config is not OK. You have two subscribers to the same channel:

    1. .channel(exampleConfig.runexampleScriptWaiting.get()) and the next route()

    2. .from(exampleConfig.runexampleScriptWaiting.get()) and the next handle()

    This may cause unexpected behavior, e.g. round-robin messages distribution.

    I would do filter() and delay() instead in addition to an ExecutorChannel since you are asking about non-blocking retry:

    .channel(exampleConfig.runexampleScriptWaiting.get())
    .filter(jobStatusService::areJobsFinished, 
                filter -> filter.discardFlow(
                              discardFlow -> discardFlow
                                                .delay(1000)
                                                .channel(exampleConfig.runexampleScriptWaiting.get())))
    

    The exampleSuccessSubflow could go just after this filter() as part of this flow or via to(exampleSuccessSubflow()).

    Pay attention to that discardFlow: we delay non-finished message a little bit and produce it back to that runexampleScriptWaiting channel for calling this filter again. If you make this channel as an ExecutorChannel (or QueueChannel), your wait functionality is going to be non-blocking. But at the same time your main flow is still going to be blocked for this request since you continue waiting for reply. Therefore it might not make too much sense to make this filtering logic as non-blocking and you can still use that Thread.sleep() instead of delay().

    The router solution also may work, but you cannot use that runexampleScriptWaiting channel as an input of that sub-flow. Probably that's the reason behind that your problem with "process stuck".