I have a normal spring cloud stream application that simple reads data from Kafka topic and produces messages to another Kafka topic, please find below the configurations:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.5.6</version>
<relativePath/>
</parent>
<properties>
<spring-cloud.version>2020.0.4</spring-cloud.version>
<spring-boot-maven-plugin.version>2.3.0.RELEASE</spring-boot-maven-plugin.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb-reactive</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
<exclusions>
<exclusion>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-tomcat</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
And the following application.proeprties
#Kafka Configurations
spring.kafka.bootstrap-servers=localhost:9092
spring.cloud.stream.kafka.binder.configuration.auto.offset.reset=latest
spring.cloud.function.definition=merchantCredentials;validatedProducts;validateImages;retryUnprocessedItems
#Input topics
#Merchants
spring.cloud.stream.bindings.merchantCredentials-in-0.destination=mis.merchantCtpCredentials
spring.cloud.stream.kafka.bindings.merchantCredentials-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.merchantCredentials-in-0.contentType=application/json
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.merchantCredentials-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.merchantCredentials-in-0.group=tuevGroup
#kfc.notifications.product
spring.cloud.stream.bindings.validatedProducts-in-0.destination=kfc.notifications.product
spring.cloud.stream.kafka.bindings.validatedProducts-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validatedProducts-in-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.concurrency=5
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validatedProducts-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validatedProducts-in-0.group=tuevGroup
#marketplace.products
spring.cloud.stream.bindings.validateImages-in-0.destination=marketplace.products
spring.cloud.stream.kafka.bindings.validateImages-in-0.consumer.ack-mode=manual_immediate
spring.cloud.stream.bindings.validateImages-in-0.contentType=application/json
spring.cloud.stream.bindings.validateImages-in-0.consumer.header-mode=headers
spring.cloud.stream.bindings.validateImages-in-0.consumer.partitioned=true
spring.cloud.stream.bindings.validateImages-in-0.consumer.max-attempts=1
spring.cloud.stream.bindings.validateImages-in-0.group=tuevGroup
#Output topics
#productValidated
spring.cloud.stream.bindings.validatedProducts-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.validatedProducts-out-0.contentType=application/json
spring.cloud.stream.bindings.validatedProducts-out-0.producer.partition-count=10
spring.cloud.stream.bindings.validatedProducts-out-0.producer.header-mode=headers
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.destination=marketplace.validated.products
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.contentType=application/json
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.partition-count=10
spring.cloud.stream.bindings.retryUnprocessedItems-out-0.producer.header-mode=headers
spring.cloud.stream.poller.cron=0 0/10 * * * *
spring.cloud.stream.poller.initial-delay=10000
And below is the signature of all defined spring cloud functions
@Bean
public Consumer<Flux<Message<JsonNode>>> merchantCredentials() {
@Bean
public Function<Message<NotificationMessage>, Message<ProductValidatedEvent>> validatedProducts() {
@Bean
public Consumer<Message<ProductImportMessage>> validateImages() {
@PollableBean
@SchedulerLock(name = "retryProcess_scheduledTask", lockAtMostFor = "${retry.job.lock.atMost}", lockAtLeastFor = "${retry.job.lock.atLeast}")
public Supplier<Flux<Message<ProductValidatedEvent>>> retryUnprocessedItems() {
Everything works fine, and the application starts and functions as it should, however, in the logs I encounter this exception multiple times, specially during the start up phase of the application
org.springframework.boot.SpringApplication - Application run failed org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
I have double checked all configurations and I still have no clue how can I prevent this issue from happening. Why this exception is happening? is it ignorable ?
UPDATE 1:
I have tracked the bug to this function in spring framework, FunctionTypeUtils:
public static Type discoverFunctionTypeFromClass(Class<?> functionalClass) {
Assert.isTrue(isFunctional(functionalClass), "Type must be one of Supplier, Function or Consumer");
this function gets called by this function in FunctionConfiguration:
private String[] filterEligibleFunctionDefinitions() {
...
for (int i = 0; i < functionNames.length && eligibleDefinition; i++) {
String functionName = functionNames[i];
if (this.applicationContext.containsBean(functionName)) {
And when I added debug points to this one, as well as the previous one I got the following output
functionName: merchantCredentials
functionalClass: com.rewedigital.services.tuev.marketplace.merchant.flow.MerchantFlowManger$$Lambda$1323/0x00000008008fc040
functionName: validatedProducts
functionalClass: com.rewedigital.services.tuev.marketplace.validator.listener.ProductChangedListener$$Lambda$1331/0x00000008008fa040
functionName: validateImages
functionalClass: com.rewedigital.services.tuev.marketplace.sieve.listener.ProductImagesListener$$Lambda$1324/0x00000008008fc440
functionName: retryUnprocessedItems
functionalClass: org.springframework.beans.factory.support.NullBean
Showing that the retryUnprocessedItems is the culprit, not sure why though?
After some investigation, it turned out that the problem is mainly with the @SchedulerLock
annotation.
I observed that this issue happens while then shedLock table has lock added on the method, and hence it was preventing the FunctionBeanRegistrar
from adding the method, and so the exception.
Of course this means on the other hand that the annotation is now deemed not usable because what @PollableBean
annotation really runs is not the function itself, but rather the Supplier lambda expression returned by the function, practically rendering the ShedLock
useless.
Once I remove the annotation, all the exceptions are gone and the sun shines again, birds sing, etc etc..
The next question to answer would be how to use rollable bean in a distributed manner but that is out of the scope of this question