Search code examples
javaspring-integration

Prevent a message to be polled more than once using Spring Integration?


I`m using Spring Integration and JpaPollingChannelAdapter to poll entried with a certain state from a database. If there are entries found, those are then processed.

I wonder if there is any way to prevent an entry to be polled twice?

I`ve tried to use JpaOutboundGateway to update the state on the JPA entity, but if the Polling frequency is high, the entry is beeing polled more than once before it gets updated.

Hehre´s my config:

@Bean
public MessageChannel defaultInputChannel() {
    return new DirectChannel();
}

@Bean
public JpaExecutor jpaFindAllExecutor() {
    JpaExecutor jpaExecutor = new JpaExecutor(this.entityManagerFactory);
    jpaExecutor.setNamedQuery("SELECT o FROM MyEntity o WHERE o.status=0");
    return jpaExecutor;
}

@Bean
@InboundChannelAdapter(poller = @Poller(maxMessagesPerPoll = "${poller.maxMessagesPerPoll}", fixedDelay = "${poller.interval}", receiveTimeout = "${poller.receiveTimeout}"))
public MessageSource<?> pollingSource() {
    return new JpaPollingChannelAdapter(jpaFindAllExecutor());
}

Solution

  • The JpaExecutor comes with an option like deleteAfterPoll:

    https://docs.spring.io/spring-integration/docs/current/reference/html/jpa.html#jpa-inbound-channel-adapter

    Set this value to true if you want to delete the rows received after execution of the query. You must ensure that the component operates as part of a transaction.

    If you don't like DELETE. but rather UPDATE, consider to use respective Hibernate annotation on your entity:

    @SQLDelete(sql = "Update MyEntity set status = 1 where id = ?")
    

    According your @InboundChannelAdapter, the JpaOutboundGateway must work as well. You perhaps shifts your processing from one thread to another. As long as you don't leave polling thread, no more data is going to be polled.