I just upgraded a Spring Boot application that uses Spring Cloud Stream Kafka producers and consumers to
plugins {
id("org.springframework.boot") version "2.6.1"
...
}
extra["springCloudVersion"] = "2021.0.0"
extra["springCloudStreamVersion"] = "3.2.1"
The applications doesn't start anymore with the following exception:
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionInitializer' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1804)
...
Caused by: java.lang.ClassCastException: class reactor.core.publisher.MonoPeekTerminal cannot be cast to class reactor.core.publisher.Flux (reactor.core.publisher.MonoPeekTerminal and reactor.core.publisher.Flux are in unnamed module of loader 'app')
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorFluxStream(TraceFunctionAroundWrapper.java:187)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.reactorStream(TraceFunctionAroundWrapper.java:120)
at org.springframework.cloud.sleuth.instrument.messaging.TraceFunctionAroundWrapper.doApply(TraceFunctionAroundWrapper.java:97)
at org.springframework.cloud.function.context.catalog.FunctionAroundWrapper.apply(FunctionAroundWrapper.java:47)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$1.doApply(SimpleFunctionRegistry.java:256)
at org.springframework.cloud.function.context.catalog.SimpleFunctionRegistry$FunctionInvocationWrapper.apply(SimpleFunctionRegistry.java:550)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.bindFunctionToDestinations(FunctionConfiguration.java:512)
at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder.afterPropertiesSet(FunctionConfiguration.java:418)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.invokeInitMethods(AbstractAutowireCapableBeanFactory.java:1863)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1800)
... 16 common frames omitted
Did I miss any upgrade guide or is it a bug?
Producer
@Component
class EventProducer(@Qualifier("eventSink") private val eventProcessor: Sinks.Many<Message<EventReceived>>) {
private val logger = LoggerFactory.getLogger(javaClass)
fun send(event: EventReceived): Mono<EventReceived> {
return Mono.defer {
val message = MessageBuilder.withPayload(event)
.setHeader(MESSAGE_KEY, event.id)
.setHeader(TIMESTAMP, OffsetDateTime.now().toInstant().toEpochMilli())
.build()
logger.info("Sending event {}", event)
while (eventProcessor.tryEmitNext(message).isFailure) {
LockSupport.parkNanos(10)
}
event.toMono()
}.subscribeOn(Schedulers.boundedElastic())
}
Consumer
@Configuration
class MetricConsumer(...) {
private val logger = LoggerFactory.getLogger(javaClass)
@Bean
fun consumeMetricUpdated(): Function<Flux<Message<MetricUpdated>>, Mono<Void>> {
...
}
This looks like a bug in s-c-sleuth. I'll follow up with Marcin about this.
Can you please post the signature of your function as well, need to confirm something?
Meanwhile, you can temporarily disconnect sleuth's TraceFunctionAroundWrapper
by setting spring.sleuth.function.enabled
to false
.