Search code examples
javaspring-cloudspring-cloud-streamspring-cloud-function

Exception: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel


I have a sandbox for exploring newly added functions in Spring Cloud Stream, but I've faced a problem with using Function and Supplier in one Spring Cloud Stream application.

In code I used examples described in docs.

Firstly I added to project Function<String, String> with corresponding spring.cloud.stream.bindings and spring.cloud.stream.function.definition properties in application.yml. Everything is working fine, I post message to my-fun-in Kafka topic, application execute function and send result to my-fun-out topic.

Then I added Supplier<Flux<String>> to the same project with corresponding spring.cloud.stream.bindings and updated spring.cloud.stream.function.definition value to fun;sup. And here weird things start to happen. When I try to start application I receive the following error:

2020-01-15 01:45:16.608 ERROR 10128 --- [oundedElastic-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'application.sup-out-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}], failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:206)
    at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:188)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:170)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:219)
    at org.springframework.integration.handler.AbstractMessageHandler.onNext(AbstractMessageHandler.java:57)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:165)
    at org.springframework.integration.endpoint.ReactiveStreamsConsumer$DelegatingSubscriber.hookOnNext(ReactiveStreamsConsumer.java:148)
    at reactor.core.publisher.BaseSubscriber.onNext(BaseSubscriber.java:160)
    at reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onNext(FluxDoFinally.java:123)
    at reactor.core.publisher.EmitterProcessor.drain(EmitterProcessor.java:426)
    at reactor.core.publisher.EmitterProcessor.onNext(EmitterProcessor.java:268)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.drain(FluxCreate.java:793)
    at reactor.core.publisher.FluxCreate$BufferAsyncSink.next(FluxCreate.java:718)
    at reactor.core.publisher.FluxCreate$SerializedSink.next(FluxCreate.java:153)
    at org.springframework.integration.channel.FluxMessageChannel.doSend(FluxMessageChannel.java:63)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.integration.channel.FluxMessageChannel.lambda$subscribeTo$2(FluxMessageChannel.java:83)
    at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:189)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.runAsync(FluxPublishOn.java:398)
    at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.run(FluxPublishOn.java:484)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:84)
    at reactor.core.scheduler.WorkerTask.call(WorkerTask.java:37)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:304)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[20], headers={contentType=application/json, id=89301e00-b285-56e0-cb4d-8133555c8905, timestamp=1579045516603}]
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    ... 34 more

After it I've tried several thing:

  1. Reverted spring.cloud.stream.function.definition to fun (disable sup bean binding to the external destination). Application started, function worked, supplier didn't work. Everything as expected.
  2. Changed spring.cloud.stream.function.definition to sup (disable fun bean binding to the external destination). Application started, function didn't work, supplier worked (produced message to my-sup-out topic every second). Everything as expected as well.
  3. Updated spring.cloud.stream.function.definition value to fun;sup. Application didn't start, got same MessageDeliveryException.
  4. Swapped spring.cloud.stream.function.definition value to sup;fun. Application started, supplier worked, but function didn't work (didn't send messages to my-fun-out topic).

The last one confused me even more than error) So now I need someone's help to sort thing out.

Did I miss something in cofiguration? Why changing beans order separated by ; in spring.cloud.stream.function.definition leads to different results?

Full project is uploaded to GitHub and added below:

StreamApplication.java:

package com.kaine;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import reactor.core.publisher.Flux;

import java.util.function.Function;
import java.util.function.Supplier;

@SpringBootApplication
public class StreamApplication {

    public static void main(String[] args) {
        SpringApplication.run(StreamApplication.class);
    }

    @Bean
    public Function<String, String> fun() {
        return value -> value.toUpperCase();
    }

    @Bean
    public Supplier<Flux<String>> sup() {
        return () -> Flux.from(emitter -> {
            while (true) {
                try {
                    emitter.onNext("Hello from Supplier!");
                    Thread.sleep(1000);
                } catch (Exception e) {
                    // ignore
                }
            }
        });
    }
}

application.yml

spring:
  cloud:
    stream:
      function:
        definition: fun;sup
      bindings:
        fun-in-0:
          destination: my-fun-in
        fun-out-0:
          destination: my-fun-out
        sup-out-0:
          destination: my-sup-out

build.gradle.kts:

plugins {
    java
}

group = "com.kaine"
version = "1.0-SNAPSHOT"

repositories {
    mavenCentral()
}

dependencies {
        implementation(platform("org.springframework.cloud:spring-cloud-dependencies:Hoxton.SR1"))
        implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")

        implementation(platform("org.springframework.boot:spring-boot-dependencies:2.2.2.RELEASE"))
}

configure<JavaPluginConvention> {
    sourceCompatibility = JavaVersion.VERSION_11
}

Solution

  • Actually this is a problem with our documentation as I believe we provide a bad example of the reactive Supplier for his case. The issue is that your Supplier is in an infinite blocking loop. It basically never returns. So please change it to something like:

    @Bean
    public Supplier<Flux<String>> sup() {
        return () -> Flux.fromStream(Stream.generate(new Supplier<String>() {
    
            @Override
            public String get() {
                try {
                    Thread.sleep(1000);
                    return "Hello from Supplier";
                } catch (Exception e) {
                    // ignore
                }
            }
    
        })).subscribeOn(Schedulers.elastic()).share();
    }