Spring Integration has ZooKeeper support as documented in https://docs.spring.io/spring-integration/reference/html/zookeeper.html However this document is so vague.
It suggests adding below bean but does not give details on how to start/stop a poller when the node is granted leadership.
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
return new LeaderInitiatorFactoryBean()
Do we have any example on how to ensure below poller is run only once in a cluster at any time using zookeeper?
public class EventsPoller {
public void pullEvents() {
//pull events should be run by only one node in the cluster at any time
The LeaderInitiator
emits an OnGrantedEvent
and OnRevokedEvent
, when it becomes leader and its leadership is revoked.
See https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#endpoint-roles and the next https://docs.spring.io/spring-integration/reference/html/messaging-endpoints-chapter.html#leadership-event-handling for more info about those events handling and how it affects your components in the particular role.
Although I agree that Zookkeper chapter must have some link to that SmartLifecycleRoleController
chapter. Feel free to raise a JIRA on the matter and contribution is welcome!
This is what I did in our test:
public class LeaderInitiatorFactoryBeanTests extends ZookeeperTestSupport {
private static CuratorFramework client;
private PollableChannel stringsChannel;
public static void getClient() throws Exception {
client = createNewClient();
public static void closeClient() {
if (client != null) {
public void test() {
public static class Config {
public LeaderInitiatorFactoryBean leaderInitiator(CuratorFramework client) {
return new LeaderInitiatorFactoryBean()
public CuratorFramework client() {
return LeaderInitiatorFactoryBeanTests.client;
@InboundChannelAdapter(channel = "stringsChannel", autoStartup = "false", poller = @Poller(fixedDelay = "100"))
public Supplier<String> inboundChannelAdapter() {
return () -> "foo";
public PollableChannel stringsChannel() {
return new QueueChannel();
And I have in logs something like this:
2018-12-14 10:12:33,542 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Starting [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo
2018-12-14 10:12:33,578 DEBUG [Curator-LeaderSelector-0] [org.springframework.integration.support.SmartLifecycleRoleController] - Stopping [leaderInitiatorFactoryBeanTests.Config.inboundChannelAdapter.inboundChannelAdapter] in role foo