Search code examples
javaspringspring-integrationspring-integration-dsl

Spring Integration: start JPA polling only when all the results of last polling has been processed


I have a following flow that I would like to implement using Spring Integration Java DSL:

  1. Poll a table in a database every 2 hours which returns id of documents that need to be processed
  2. For each id, process a document through an HTTP gateway
  3. Store a response in a database

I have a working Java code that does exactly these steps. An additional requirement that I'm struggling with is that the polling for the next round of documents shouldn't happen until all the documents from the last polling has been processed and stored in the database.

Is there any pattern in Spring Integration that I could use for this additional requirement?

Here is a simplified code - it will get more complex and I'll split processing of the documents (HTTP outbound and persisting) into separate classes / flows:

return IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
                .entityClass(ProcessingMetadata.class)
                .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                        "where p.status = com.test.ProcessingStatus.PROCESSED")
                .maxResults(1)
                .expectSingleResult(true),
        e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10))))
        .handle(Jpa.retrievingGateway(this.sourceEntityManagerFactory)
                .entityClass(DocumentHeader.class)
                .jpaQuery("from DocumentHeader d where d.modified > :modified")
                .parameterExpression("modified", "payload"))
        .handle(Http.outboundGateway(uri)
                .httpMethod(HttpMethod.POST)
                .expectedResponseType(String.class))
        .handle(Jpa.outboundAdapter(this.targetEntityManagerFactory)
                        .entityClass(ProcessingMetadata.class)
                        .persistMode(PersistMode.PERSIST),
                e -> e.transactional(true))
        .get();

UPDATE

Following Artem's suggestion, I'm trying to implement it using a SimpleActiveIdleMessageSourceAdvice

class WaitUntilCompleted extends SimpleActiveIdleMessageSourceAdvice {

    public WaitUntilCompleted(DynamicPeriodicTrigger trigger) {
        super(trigger);
    }

    @Override
    public boolean beforeReceive(MessageSource<?> source) {
        return false;
    }
}

If I understand it correctly, above code would stop polling. Now I have no idea how to attach this Advice to the Jpa.inboundAdapter... It doesn't seem to have a proper method (neither Advice nor Spec Handler). Do I miss something obvious here? I've tried attaching the Advice to the Jpa.retrievingGateway but it doesn't change the flow at all.

UPDATE2

Check this question for a complete solution: Spring Integration: how to unit test an advice


Solution

  • I have answered today for similar question: How to poll from a queue 1 message at a time after downstream flow is completed in Spring Integration.

    You also may have a trick on database level do not let to see new records in the table while others are locked. Or you can have some UPDATE in the end of flow while your SELECT won't see appropriate records until they are updated respectively.

    But anyway any of those approaches I suggest for that question should be applied here as well.

    Also you indeed can consider to rely on the SimpleActiveIdleMessageSourceAdvice since your solution is already based on a MessageSource implementation.

    UPDATE

    For your use-case it is probably would be better to extend that SimpleActiveIdleMessageSourceAdvice and override its beforeReceive() to check some state that you are able to read more data or not. The idlePollPeriod and activePollPeriod could be the same value: doesn't look like it make sense to change it in between since you are going to the idle state just after reading the next set of data.

    For the state to check it really might be a simple AtomicBoolean bean which you should change after you process the current set of documents. That might be something after an aggregator or anything else you can use in your solution.

    UPDATE 2

    To use a WaitUntilCompleted for your Jpa.inboundAdapter you should have a configuration like this:

    IntegrationFlows.from(Jpa.inboundAdapter(this.targetEntityManagerFactory)
                .entityClass(ProcessingMetadata.class)
                .jpaQuery("select max(p.modifiedDate) from ProcessingMetadata p " +
                        "where p.status = com.test.ProcessingStatus.PROCESSED")
                .maxResults(1)
                .expectSingleResult(true),
        e -> e.poller(Pollers.fixedDelay(Duration.ofSeconds(10)).advice(waitUntilCompleted())))
    

    Pay attention to the .advice(waitUntilCompleted()) which is a part of the pller configuration and points to your advice bean.