Search code examples
springspring-bootspring-cloudspring-cloud-stream

Startup error in Spring Cloud Stream after upgrading to Spring Boot 2.6.1 and Spring Cloud 2021.0.0


I just upgraded a Spring Boot application that uses Spring Cloud Stream Kafka producers and consumers to

plugins {
    id("org.springframework.boot") version "2.6.1"
    ...
}
extra["springCloudVersion"] = "2021.0.0"
extra["springCloudStreamVersion"] = "3.2.1"

The applications doesn't start anymore with the following exception:

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
...
Caused by: java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorFluxStream(TraceFunctionAroundWrapper.java:187)
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorStream(TraceFunctionAroundWrapper.java:120)
    at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:97)
    at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:47)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:256)
    at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:512)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:418)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
    ... 16 common frames omitted

Did I miss any upgrade guide or is it a bug?

Producer

@Component
class EventProducer(@Qualifier("eventSink") private val eventProcessor: Sinks.Many<Message<EventReceived>>) {

    private val logger = LoggerFactory.getLogger(javaClass)

    fun send(event: EventReceived): Mono<EventReceived> {
        return Mono.defer {
            val message = MessageBuilder.withPayload(event)
                .setHeader(MESSAGE_KEY, event.id)
                .setHeader(TIMESTAMP, OffsetDateTime.now().toInstant().toEpochMilli())
                .build()
            logger.info("Sending event {}", event)
            while (eventProcessor.tryEmitNext(message).isFailure) {
                LockSupport.parkNanos(10)
            }
            event.toMono()
        }.subscribeOn(Schedulers.boundedElastic())
    }

Consumer

@Configuration
class MetricConsumer(...) {

    private val logger = LoggerFactory.getLogger(javaClass)

    @Bean
    fun consumeMetricUpdated(): Function<Flux<Message<MetricUpdated>>, Mono<Void>> {
        ...
    }


Solution

  • This looks like a bug in s-c-sleuth. I'll follow up with Marcin about this. Can you please post the signature of your function as well, need to confirm something? Meanwhile, you can temporarily disconnect sleuth's TraceFunctionAroundWrapper by setting spring.sleuth.function.enabled to false.