I have a following flow that I would like to implement using Spring Integration Java DSL:
I have a working Java code that does exactly these steps. An additional requirement that I'm struggling with is that the polling for the next round of documents shouldn't happen until all the documents from the last polling has been processed and stored in the database.
Is there any pattern in Spring Integration that I could use for this additional requirement?
Here is a simplified code - it will get more complex and I'll split processing of the documents (HTTP outbound and persisting) into separate classes / flows:
return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
.handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
.entityClass(DocumentHeader.class)
.jpaQuery("from DocumentHeader d where d.modified > :modified")
.parameterExpression("modified", "payload"))
.handle(Http.outboundGateway(uri)
.httpMethod(HttpMethod.POST)
.expectedResponseType(String.class))
.handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.persistMode(PersistMode.PERSIST),
e -> e.transactional(true))
.get();
UPDATE
Following Artem's suggestion, I'm trying to implement it using a SimpleActiveIdleMessageSourceAdvice
class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {
public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
super(trigger);
}
@Override
public boolean beforeReceive(MessageSource<?> source) {
return false;
}
}
If I understand it correctly, above code would stop polling. Now I have no idea how to attach this Advice to the Jpa.inboundAdapter... It doesn't seem to have a proper method (neither Advice nor Spec Handler). Do I miss something obvious here? I've tried attaching the Advice to the Jpa.retrievingGateway but it doesn't change the flow at all.
UPDATE2
Check this question for a complete solution: Spring Integration: how to unit test an advice
I have answered today for similar question: How to poll from a queue 1 message at a time after downstream flow is completed in Spring Integration.
You also may have a trick on database level do not let to see new records in the table while others are locked. Or you can have some UPDATE
in the end of flow while your SELECT
won't see appropriate records until they are updated respectively.
But anyway any of those approaches I suggest for that question should be applied here as well.
Also you indeed can consider to rely on the SimpleActiveIdleMessageSourceAdvice
since your solution is already based on a MessageSource
implementation.
UPDATE
For your use-case it is probably would be better to extend that SimpleActiveIdleMessageSourceAdvice
and override its beforeReceive()
to check some state that you are able to read more data or not. The idlePollPeriod
and activePollPeriod
could be the same value: doesn't look like it make sense to change it in between since you are going to the idle state just after reading the next set of data.
For the state to check it really might be a simple AtomicBoolean
bean which you should change after you process the current set of documents. That might be something after an aggregator or anything else you can use in your solution.
UPDATE 2
To use a WaitUntilCompleted
for your Jpa.inboundAdapter
you should have a configuration like this:
IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
.entityClass(ProcessingMetadata.class)
.jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
"where p.status = com.test.ProcessingStatus.PROCESSED")
.maxResults(1)
.expectSingleResult(true),
e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))
Pay attention to the .advice(waitUntilCompleted())
which is a part of the pller configuration and points to your advice bean.