Search code examples
apache-kafkaspring-integrationvaadinspring-cloudspring-cloud-stream

Dispatcher has no subscribers for channel - spring-cloud-stream-kafka


after upgrading to Spring Boot 2, Reactor 3.5, kafka-binder 2.0.0 RELEASE, and kafka-client 1.0.1 one of modules doesn't works. I spent 5 days to make it and read probably all related topics but cannot find reason of this behavior.

Main class:

@Slf4j
@EnableI18N
@EnableSideBar
@ComponentScan
@SpringBootConfiguration
@EnableConfigurationProperties
@EnableAspectJAutoProxy(proxyTargetClass = true)
public class View
{
    public static void main(String[] args)
    {
        new SpringApplicationBuilder(View.class)
            .profiles("production")
            .bannerMode(Banner.Mode.OFF)
            .headless(true)
            .application()
            .run(args);

        log.info("\nhttp://localhost:8083/\n");
    }
}

Configuration marker:

@Configuration
@Profile("production")
@EnableBinding(OffersChannel.class)
class ProductionOffersConfiguration
{
}

Channel interface:

public interface OffersChannel
{
    String OFFERS_OBTAIN = "offersObtain";
    String OFFERS_OBTAIN_REQUEST= "offersObtainRequest";

    @Input(OFFERS_OBTAIN)
    SubscribableChannel offersChannel();

    @Output(OFFERS_OBTAIN_REQUEST)
    MessageChannel offersRequestChannel();
}

And AdminUI class, what is important before updating dependencies, when this class was initialized I was able to see a log in console which says that I subscribed to channel, now nothing happens:

@Slf4j
@Push
@Theme("${view.default-theme}")
@SpringUI(path = WebsiteMapping.ADMIN)
@RequiredArgsConstructor
public class AdminUI extends UI
{
    MessageChannel offersObtainRequest;
    Grid<Ad> adGrid = createAdGrid();
    private List<Ad> ads = new LinkedList<>();
    ConnectionService connectionService;

    @Override
    @SneakyThrows
    protected void init(VaadinRequest vaadinRequest)
    {
        setContent(splitPane());
        adGrid.setItems(ads);
    }

    private VerticalSplitPanel splitPane()
    {
        VerticalSplitPanel verticalSplitPanel = new VerticalSplitPanel();
        verticalSplitPanel.setSplitPosition(10, Unit.PERCENTAGE);
        verticalSplitPanel.setFirstComponent(buttonsLayout());
        verticalSplitPanel.setSecondComponent(adGrid);

        return verticalSplitPanel;
    }

    private Layout buttonsLayout()
    {
        HorizontalLayout layout = new HorizontalLayout();
        layout.setMargin(true);

        layout.addComponent(requestMoreOffersButton());

        ThemeSelectorComboBox themeSelectorComboBox = new ThemeSelectorComboBox();
        layout.addComponent(themeSelectorComboBox);
        layout.setComponentAlignment(themeSelectorComboBox, Alignment.MIDDLE_RIGHT);

        return layout;
    }

    @PostConstruct
    private void createGridProperties()
    {
        adGrid.setSizeFull();
        adGrid.addColumn(Ad::getTitle).setCaption("Title");
        adGrid.addColumn(Ad::getLocation).setCaption("Location");
        adGrid.addColumn(Ad::getHref).setCaption("Href");
    }

    @StreamListener
    public void fetchAdsFrom(@Input(OffersChannel.OFFERS_OBTAIN) Flux<Ad> fluxAd)
    {
        fluxAd.subscribe(this::displayOfferInGrid);
    }

    private void displayOfferInGrid(Ad ad)
    {
        ads.add(ad);
        adGrid.setItems(ads);
    }

    private Button requestMoreOffersButton()
    {
        return new Button("Request 10 more offers", this::requestMoreOffers);
    }

    private Button startServiceButton(String caption, String url, String message)
    {
        return new Button(caption, buttonClickedEvent -> startService(url, message));
    }

    private void startService(String url, String message)
    {
        ConnectionRequest startProviderRequest = ConnectionRequest
            .builder()
            .url(url)
            .build();

        connectionService
            .getForHtml(startProviderRequest)
            .thenAccept(serviceStarted -> Notification.show(message));
    }

    private void requestMoreOffers(Event event)

    {
        offersObtainRequest.send(new GenericMessage(new AdBroadcastRequest(ads.size(),10)));
    }

    private Grid<Ad> createAdGrid()
    {
        return new Grid<>();
    }
}

Application.yml:

spring:
    cloud:
        stream:
            bindings:
                offersObtainRequest:
                   destination: adsBroadcastRequestes
                   binder: kafka
                   group: adsBroadcastRequestsProducer
                offersObtain:
                   destination: adsBroadcast
                   binder: kafka
                   group: adsConsumer

stacktrace:

2018-04-07 11:27:12.010 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=0
2018-04-07 11:27:12.010 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:12.010 DEBUG o.s.r.backoff.ExponentialBackOffPolicy  : Sleeping for 1000
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=1
2018-04-07 11:27:13.011 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=1
2018-04-07 11:27:13.011 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=2, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:13.011 DEBUG o.s.r.backoff.ExponentialBackOffPolicy  : Sleeping for 2000
2018-04-07 11:27:13.909 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Sending Heartbeat request to coordinator Kacper-PC:9092 (id: 2147483647 rack: null)
2018-04-07 11:27:14.138 DEBUG o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-2, groupId=adsConsumer] Received successful Heartbeat response
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=2
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Retry: count=2
2018-04-07 11:27:15.011 DEBUG o.s.integration.channel.DirectChannel   : preSend on channel 'offersObtain', message: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Checking for rethrow: count=3
2018-04-07 11:27:15.011 DEBUG o.s.retry.support.RetryTemplate         : Retry failed last attempt: count=3
2018-04-07 11:27:15.012 DEBUG o.s.i.h.a.ErrorMessageSendingRecoverer  : Sending ErrorMessage: failedMessage: GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]

org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.offersObtain'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[227], headers={kafka_offset=12, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@11d6acc8, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=adsBroadcast, kafka_receivedTimestamp=1523093225984}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:70) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:120) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) [spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211) [spring-retry-1.2.2.RELEASE.jar:na]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_151]
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_151]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_151]
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:138) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
    ... 23 common frames omitted

Solution

  • Well, there is a lot going on in there. I mean all the non-Spring-Cloud-Stream stuff. . . makes it hard to follow. In any event, your @StreamListener is not defined inside of any Spring managed configuration classes, so it doesn't get picked up. You can move it to ProductionOffersConfiguration or View or any other Spring managed configuration classes.

    Also, consider going through this quick tutorial https://docs.spring.io/spring-cloud-stream/docs/Elmhurst.RELEASE/reference/htmlsingle/#_quick_start (5 min) to better understand the mechanics of spring-cloud-stream