Search code examples
javaspringspring-integrationspring-messaging

Spring Integration message polling


I have a Spring configuration setup for polling messages from a db queue:

<int:annotation-config default-publisher-channel="messageChannel" />

<task:executor id="messageTaskExecutor" pool-size="1"
    queue-capacity="1" rejection-policy="CALLER_RUNS" />

<int:transaction-synchronization-factory id="syncFactory">
    <int:after-commit expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
    <int:after-rollback expression="@messageSessionStore.removeFromIdCache(headers.id.toString())" />
</int:transaction-synchronization-factory>

<bean id="messageQueryProvider"
    class="org.springframework.integration.jdbc.store.channel.OracleChannelMessageStoreQueryProvider" />

<bean id="messageSessionStore"
    class="org.springframework.integration.jdbc.store.JdbcChannelMessageStore">
    <property name="dataSource" ref="dataSource" />
    <property name="channelMessageStoreQueryProvider" ref="messageQueryProvider" />
    <property name="tablePrefix" value="QUEUE_" />
    <property name="usingIdCache" value="true" />
</bean>

<int:channel id="messageChannel">
    <int:queue message-store="messageSessionStore" />
</int:channel>

<int:poller id="defaultPoller" fixed-delay="500" max-messages-per-poll="1" task-executor="messageTaskExecutor" default="true">
    <int:transactional propagation="REQUIRED" synchronization-factory="syncFactory" isolation="READ_COMMITTED" transaction-manager="eosTransactionManager"/>
</int:poller>

However, the application runs on multiple nodes. When the server is restarted, it seems to happen that messages are picked up by more than 1 node (the nodes are all shut down at once and restarted in sequence). Is there any way to avoid multiple message processing?


Solution

  • That's somehow isn't possible using OracleChannelMessageStoreQueryProvider. Just because we rely there on the FOR UPDATE SKIP LOCKED. Therefore when SELECT is performed by one node, the records are locked and the next one will go to the next free rows in the table.

    There is a not in the JavaDoc for the setUsingIdCache():

     * <p>If using the provided {@link OracleChannelMessageStoreQueryProvider}, don't set {@link #usingIdCache}
     * to true, as the Oracle query will ignore locked rows.</p>
    

    But I think it's fully unrelated. Removing that option and the <int:transaction-synchronization-factory> you will simplify your configuration, but the behavior must not be changed.

    I think what you see is like round-robin: one node get a first row, the next one skip it and get the next one.

    I somehow don't believe that different nodes get the same message when it is Oracle.