I have a Spring configuration setup for polling messages from a db queue:
<int:annotation-config default-publisher-channel="messageChannel" />
<task:executor id="messageTaskExecutor" pool-size="1"
queue-capacity="1" rejection-policy="CALLER_RUNS" />
<int:transaction-synchronization-factory id="syncFactory">
<int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
<int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>
<bean id="messageQueryProvider"
class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />
<bean id="messageSessionStore"
class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
<property name="dataSource" ref="dataSource" />
<property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
<property name="tablePrefix" value="QUEUE_" />
<property name="usingIdCache" value="true" />
</bean>
<int:channel id="messageChannel">
<int:queue message-store="messageSessionStore" />
</int:channel>
<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
<int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>
However, the application runs on multiple nodes. When the server is restarted, it seems to happen that messages are picked up by more than 1 node (the nodes are all shut down at once and restarted in sequence). Is there any way to avoid multiple message processing?
That's somehow isn't possible using OracleChannelMessageStoreQueryProvider
. Just because we rely there on the FOR UPDATE SKIP LOCKED
. Therefore when SELECT
is performed by one node, the records are locked and the next one will go to the next free rows in the table.
There is a not in the JavaDoc for the setUsingIdCache()
:
* <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
* to true, as the Oracle query will ignore locked rows.</p>
But I think it's fully unrelated. Removing that option and the <int:transaction-synchronization-factory>
you will simplify your configuration, but the behavior must not be changed.
I think what you see is like round-robin
: one node get a first row, the next one skip it and get the next one.
I somehow don't believe that different nodes get the same message when it is Oracle.