I'm trying to build a Kotlin Processor based on Spring Cloud Stream 3.1 using Kotlin Lambda. I'm deploying this processor into a stream using Spring Cloud Data Flow but it is marked as failed.
If I cat the error log I have the following error : IllegalArgumentException: Type must be one of Supplier, Function or Consumer
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:531) ~[spring-beans-5.3.2.jar!/:5.3.2]
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.2.jar!/:5.3.2]
Here is my code :
@SpringBootApplication
class TransformerUppercaseApplication{
@Bean
fun transform(): (String) -> String {
return { "This is my uppercase".plus(it.toUpperCase()) }
}
}
fun main(args: Array<String>) {
runApplication<TransformerUppercaseApplication>(*args)
}
I've tried specifying the function name into my application.yml but no success:
spring:
cloud:
stream:
function:
definition: transform
Here is my build.gradle.kts :
import org.jetbrains.kotlin.gradle.tasks.KotlinCompile
plugins {
`maven-publish`
id("org.springframework.boot") version "2.4.1"
id("io.spring.dependency-management") version "1.0.10.RELEASE"
kotlin("jvm") version "1.4.21"
kotlin("plugin.spring") version "1.4.21"
}
group = "com.company"
version = "0.0.11"
java.sourceCompatibility = JavaVersion.VERSION_11
repositories {
mavenCentral()
maven { url = uri("https://repo.spring.io/milestone") }
}
extra["springCloudVersion"] = "2020.0.0"
dependencies {
implementation("org.jetbrains.kotlin:kotlin-reflect")
implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
implementation("org.springframework.cloud:spring-cloud-function-kotlin")
testImplementation("org.springframework.boot:spring-boot-starter-test")
}
dependencyManagement {
imports {
mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
}
}
tasks.withType<KotlinCompile> {
kotlinOptions {
freeCompilerArgs = listOf("-Xjsr305=strict")
jvmTarget = "11"
}
}
tasks.withType<Test> {
useJUnitPlatform()
}
publishing {
publications {
create<MavenPublication>("mavenJava") {
from(components["java"])
}
}
}
configurations {
val elements = listOf(apiElements, runtimeElements)
elements.forEach { element ->
element.get().outgoing.artifacts.removeIf { it -> it.buildDependencies.getDependencies(null).contains(tasks.jar.get())}
element.get().outgoing.artifact(tasks.bootJar.get())
}
}
Edit : I can get away from the error by using the java Function class instead of kotlin lambda :
@Bean
fun transform(): Function<String, String> = Function {
it.toUpperCase()
}
But for now a new problem appear, the processor is unable to bind to kafka topic, it's shown as anonymous and the processor kepp status deploying. I got this in the processor log (I've tried in a java sample too) :
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka version: 2.6.0
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka commitId: 62abe01bee039651
2021-01-19 08:13:27.674 INFO 107 --- [ main] o.a.kafka.common.utils.AppInfoParser : Kafka startTimeMs: 1611044007674
2021-01-19 08:13:27.676 INFO 107 --- [ main] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Subscribed to topic(s): transform-in-0
2021-01-19 08:13:27.677 INFO 107 --- [ main] o.s.s.c.ThreadPoolTaskScheduler : Initializing ExecutorService
2021-01-19 08:13:27.702 INFO 107 --- [ main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@24fabd0f
2021-01-19 08:13:27.715 INFO 107 --- [ main] c.e.t.TransformerUppercaseApplicationKt : Started TransformerUppercaseApplicationKt in 7.538 seconds (JVM running for 8.374)
2021-01-19 08:13:27.729 INFO 107 --- [container-0-C-1] org.apache.kafka.clients.Metadata : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Cluster ID: gN63F8OzRwaEeKElBfkmjw
2021-01-19 08:13:28.827 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Discovered group coordinator kafka-broker:9092 (id: 2147482646 rack: null)
2021-01-19 08:13:28.833 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-01-19 08:13:28.866 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:31.890 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Finished assignment for group at generation 1: {consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2-61cd1b1a-6485-4bc8-83e8-003bc22db7e1=Assignment(partitions=[transform-in-0-0])}
2021-01-19 08:13:31.935 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Successfully joined group with generation 1
2021-01-19 08:13:31.936 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Notifying assignor about the new Assignment(partitions=[transform-in-0-0])
2021-01-19 08:13:31.939 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Adding newly assigned partitions: transform-in-0-0
2021-01-19 08:13:31.950 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.963 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.977 INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Resetting offset for partition transform-in-0-0 to offset 0.
2021-01-19 08:13:32.007 INFO 107 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2 : anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3: partitions assigned: [transform-in-0-0]
I managed to get it work ! The Kotlin lambda detection problem is a problem with the 2020.0.0 release. it works with the Hoxton.SR9
The anonymous consumer problems (in the Edit) was resolved by adding these two lines in the application.properies
spring.cloud.stream.function.bindings.transform-in-0=input
spring.cloud.stream.function.bindings.transform-out-0=output