Search code examples

How to implement distributed lock around poller in Spring Integration using ZooKeeper

Spring Integration has ZooKeeper support as documented in However this document is so vague.

It suggests adding below bean but does not give details on how to start/stop a poller when the node is granted leadership.

public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
    return new LeaderInitiatorFactoryBean()

Do we have any example on how to ensure below poller is run only once in a cluster at any time using zookeeper?

public class EventsPoller {

    public void pullEvents() {
        //pull events should be run by only one node in the cluster at any time


  • The LeaderInitiator emits an OnGrantedEvent and OnRevokedEvent, when it becomes leader and its leadership is revoked.

    See and the next for more info about those events handling and how it affects your components in the particular role.

    Although I agree that Zookkeper chapter must have some link to that SmartLifecycleRoleController chapter. Feel free to raise a JIRA on the matter and contribution is welcome!


    This is what I did in our test:

    public class LeaderInitiatorFactoryBeanTests extends ZookeeperTestSupport {
        private static CuratorFramework client;
        private PollableChannel stringsChannel;
        public static void getClient() throws Exception {
            client = createNewClient();
        public static void closeClient() {
            if (client != null) {
        public void test() {
        public static class Config {
            public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
                return new LeaderInitiatorFactoryBean()
            public CuratorFramework client() {
                return LeaderInitiatorFactoryBeanTests.client;
            @InboundChannelAdapter(channel = "stringsChannel", autoStartup = "false", poller = @Poller(fixedDelay = "100"))
            public Supplier<String> inboundChannelAdapter() {
                return () -> "foo";
            public PollableChannel stringsChannel() {
                return new QueueChannel();

    And I have in logs something like this:

    2018-12-14 10:12:33,542 DEBUG [Curator-LeaderSelector-0] [] - Starting [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
    2018-12-14 10:12:33,578 DEBUG [Curator-LeaderSelector-0] [] - Stopping [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo