I have an Integration flow that regularly polls a database to retrieve any MachineLine
entities that have not yet been processed and processes them. The flow retrieves a collection of MachineLine
objects, which I would then like to be split into individual objects, transform these objects into ReportDetails
objects and persist the transformed objects into another table in the database. The flow is defined as below:
@Bean
public IntegrationFlow processMachineLine() {
return IntegrationFlows
.from(Jpa.inboundAdapter(this.entityManager)
.entityClass(MachineLine.class)
.jpaQuery(this.machineService.retrieveUnprocessedLinesQuery()),
e -> e.poller(Pollers.fixedDelay(5000)))
.split()
.transform(MachineLine.class, this::transformMachineLineToReportDetails)
.handle(Jpa.outboundAdapter(this.entityManager)
.entityClass(ReportDetails.class),
ConsumerEndpointSpec::transactional)
.get();
}
The above definition is working fine, but it is slow. The transformMachineLineToReportDetails
method sends an HTTP request to another service that takes a number of seconds to respond. With the current flow definition, each MachineLine
object waits for the previous object to be transformed and persisted before the same is done to them.
So, the ideal solution would be to perform this transformation and persistence asynchronously. A possible solution would be to insert the following line between .split()
and .transform(...)
:
.channel(new ExecutorChannel(Executors.newCachedThreadPool()))
However, this allows the JPA inbound adapter to poll the database again, before the results of the last poll are processed and persisted. This means that any MachineLine
entities returned by the previous database poll that were not transformed and persisted before the next poll will be retrieved a second time and attempt to be transformed and persisted a second time. This obviously causes unnecessary resource costs and also produces an error when multiple ReportDetails
objects with the same ID attempt to be persisted to the database.
Is there a way I can asynchronously transform the MachineLine
objects but make sure the database is not polled again until the results of the previous poll have completed their journey through the flow (i.e. all the MachineLine
objects are transformed and persisted)?
The only way I see it via a custom AbstractMessageSourceAdvice
against some AtomicBoolean
flag (could be a bean, too) to check in the beforeReceive()
. Since you use a Pollers.fixedDelay(5000)
your polling policy is still single-threaded. Therefore we are good to not let the same thread to perform a poll against JPA when it is not allowed by the AbstractMessageSourceAdvice
. The boolean flag should be true
in the begging and you change it to false
before the mentioned split()
. You can do that using a publishSubscribeChannel()
as two subscribers. Or even do that in the AbstractMessageSourceAdvice
implementation - kinda compareAndSet(true, false)
in that beforeReceive()
implementation.
Then you split and persist after transformation as you mentioned using an ExecutorChannel
.
In the end of your flow you need to place a publishSubscribeChannel()
with two subscribers - 1. handle(Jpa.outboundAdapter(this.entityManager)
; 2. aggregate()
to wait for all the splitted items to be completed. After that aggregate()
you place a simple handle(m -> pollingFlagBean().set(true))
.
That's all: your new polling will happen only when all the items are processed and aggregated back to the group. Only after that you let you poller to go again using that AtomicBoolean
.
You also may consider to combine this flag logic with a SimpleActiveIdleMessageSourceAdvice
to change a polling period between active and passive modes to avoid big idle when you wait for an aggregation.
Any other async solution still won't work for your because switching to other thread will release a polling process immediately to let it to spin again.