Search code examples
springspring-kafkaspring-cloud-stream

Spring Cloud Stream - First Kafka messages get error "Dispatcher has no subscribers"


My app successfully sends Kafka messages, but only after Kafka is initialized. Before that i get the error "Dispatcher has no subscribers". How do i wait for subscribers to finish being registered for channels?

Here's a trace of the order of events (timing in second.ms):

  • 17.165 SenderClass created
  • 17.816 initialization class, @PostConstruct starts PollingTask
  • 24.781 PollingTask sends first Kafka message
  • 24.816 First error: "Dispatcher has no subscribers"
  • 25.778 Registering MessageChannel my-channel
  • still seeing Dispatcher errors
  • 27.067 Channel my-channel' has 1 subscriber
  • No more errors after this, messages send fine

i'm not sure how to approach this. Wild guesses have included:

  1. Place sending code in @PostConstruct
  2. Add @AutoConfigureBefore(BindingServiceConfiguration.class) to Sender
  3. Add @AutoConfigureAfter(BindingServiceConfiguration.class) to SenderClass
  4. Add @AutoConfigureBefore(BindingServiceConfiguration.class) to Main
  5. Place @DependsOn({"EnableBindingClass"}) on Task
  6. Place @DependsOn({"ApplicationLifeCycle"}) on SchedulerClass, where ApplicationLifeCycle is a class that does nothing but implements SmartLifecycle with getPhase returning MAX_INT
  7. Making sure ComponentScan is on for whole package (a suggestion from other SO threads)
  8. Various combinations of the above

Created a new app, made it as simple as i could:

public interface Source {
  @Output(channelName)
  MessageChannel outboundChannel();
}
@EnableBinding(Source.class)
@Component
public class Sender {
  @Autowired
  private Source source;

  public boolean send(SomeObject object) {
    return source.outboundChannel().send(MessageBuilder.withPayload(object).build());
  }
@Service
public class Scheduler {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @PostConstruct
  public void initialize() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

Edit to add Solution

It works now! In my scheduler that starts the things that send the messages i switched from starting things in @PostConstruct to SmartLifecycle::start().

@Service
public class Scheduler implements SmartLifecycle {
  @Autowired
  Sender sender;
  @Autowired
  ThreadPoolTaskScheduler taskScheduler;

  @Override
  public void start() {
    taskScheduler.schedule(new PollingTask(), nextTime);
  }

  private class PollingTask implements Runnable {
    @Override
    public void run() {
      List<SomeObject> objects = getDummyData();
      for(SomeObject object : objects)
      {
        sender.send(interval);
      }

      Instant nextTime = Instant.now().plusMillis(1_000L);
      try {
        taskScheduler.schedule(new PollingTask(), nextTime);
      } catch (Exception e) {
        logger.error(e);
      }
    }
}

Solution

  • @PostConstruct is too early to send messages; the context is still being built.. Implememt SmartLifecycle, put the bean in a high phase (Integer.MAX_VALUE) and do the sends in start().

    Or do the sends in an ApplicationRunner.