Search code examples
spring-bootkotlingradlespring-cloud-dataflow

Create Spring Cloud Data Flow Processor using Gradle and Kotlin


I'm trying to build a Kotlin Processor based on Spring Cloud Stream 3.1 using Kotlin Lambda. I'm deploying this processor into a stream using Spring Cloud Data Flow but it is marked as failed.

If I cat the error log I have the following error : IllegalArgumentException: Type must be one of Supplier, Function or Consumer

org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'functionBindingRegistrar' defined in class path resource [org/springframework/cloud/stream/function/FunctionConfiguration.class]: Invocation of init method failed; nested exception is java.lang.IllegalArgumentException: Type must be one of Supplier, Function or Consumer
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.initializeBean(AbstractAutowireCapableBeanFactory.java:1788) ~[spring-beans-5.3.2.jar!/:5.3.2]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:609) ~[spring-beans-5.3.2.jar!/:5.3.2]
        at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:531) ~[spring-beans-5.3.2.jar!/:5.3.2]
        at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:335) ~[spring-beans-5.3.2.jar!/:5.3.2]

Here is my code :

@SpringBootApplication
class TransformerUppercaseApplication{

    @Bean
     fun transform(): (String) -> String {
        return { "This is my uppercase".plus(it.toUpperCase()) }
    }

}

fun main(args: Array<String>) {
    runApplication<TransformerUppercaseApplication>(*args)
}

I've tried specifying the function name into my application.yml but no success:

spring:
  cloud:
    stream:
      function:
        definition: transform

Here is my build.gradle.kts :

import org.jetbrains.kotlin.gradle.tasks.KotlinCompile

plugins {
    `maven-publish`
    id("org.springframework.boot") version "2.4.1"
    id("io.spring.dependency-management") version "1.0.10.RELEASE"
    kotlin("jvm") version "1.4.21"
    kotlin("plugin.spring") version "1.4.21"
}

group = "com.company"
version = "0.0.11"
java.sourceCompatibility = JavaVersion.VERSION_11

repositories {
    mavenCentral()
    maven { url = uri("https://repo.spring.io/milestone") }
}

extra["springCloudVersion"] = "2020.0.0"
dependencies {
    implementation("org.jetbrains.kotlin:kotlin-reflect")
    implementation("org.jetbrains.kotlin:kotlin-stdlib-jdk8")
    implementation("org.springframework.cloud:spring-cloud-starter-stream-kafka")
    implementation("org.springframework.cloud:spring-cloud-function-kotlin")
    testImplementation("org.springframework.boot:spring-boot-starter-test")
}

dependencyManagement {
    imports {
        mavenBom("org.springframework.cloud:spring-cloud-dependencies:${property("springCloudVersion")}")
    }
}

tasks.withType<KotlinCompile> {
    kotlinOptions {
        freeCompilerArgs = listOf("-Xjsr305=strict")
        jvmTarget = "11"
    }
}

tasks.withType<Test> {
    useJUnitPlatform()
}
publishing {
    publications {
        create<MavenPublication>("mavenJava") {
            from(components["java"])
        }
    }
}

configurations {
    val elements = listOf(apiElements, runtimeElements)
    elements.forEach { element ->
        element.get().outgoing.artifacts.removeIf { it -> it.buildDependencies.getDependencies(null).contains(tasks.jar.get())}
        element.get().outgoing.artifact(tasks.bootJar.get())
    }
}

Edit : I can get away from the error by using the java Function class instead of kotlin lambda :

@Bean
fun transform(): Function<String, String> =  Function {
    it.toUpperCase()
}

But for now a new problem appear, the processor is unable to bind to kafka topic, it's shown as anonymous and the processor kepp status deploying. I got this in the processor log (I've tried in a java sample too) :


2021-01-19 08:13:27.674  INFO 107 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka version: 2.6.0
2021-01-19 08:13:27.674  INFO 107 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka commitId: 62abe01bee039651
2021-01-19 08:13:27.674  INFO 107 --- [           main] o.a.kafka.common.utils.AppInfoParser     : Kafka startTimeMs: 1611044007674
2021-01-19 08:13:27.676  INFO 107 --- [           main] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Subscribed to topic(s): transform-in-0
2021-01-19 08:13:27.677  INFO 107 --- [           main] o.s.s.c.ThreadPoolTaskScheduler          : Initializing ExecutorService
2021-01-19 08:13:27.702  INFO 107 --- [           main] s.i.k.i.KafkaMessageDrivenChannelAdapter : started org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter@24fabd0f
2021-01-19 08:13:27.715  INFO 107 --- [           main] c.e.t.TransformerUppercaseApplicationKt  : Started TransformerUppercaseApplicationKt in 7.538 seconds (JVM running for 8.374)
2021-01-19 08:13:27.729  INFO 107 --- [container-0-C-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Cluster ID: gN63F8OzRwaEeKElBfkmjw
2021-01-19 08:13:28.827  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Discovered group coordinator kafka-broker:9092 (id: 2147482646 rack: null)
2021-01-19 08:13:28.833  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:28.866  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group.
2021-01-19 08:13:28.866  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] (Re-)joining group
2021-01-19 08:13:31.890  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Finished assignment for group at generation 1: {consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2-61cd1b1a-6485-4bc8-83e8-003bc22db7e1=Assignment(partitions=[transform-in-0-0])}
2021-01-19 08:13:31.935  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.AbstractCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Successfully joined group with generation 1
2021-01-19 08:13:31.936  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Notifying assignor about the new Assignment(partitions=[transform-in-0-0])
2021-01-19 08:13:31.939  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Adding newly assigned partitions: transform-in-0-0
2021-01-19 08:13:31.950  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.963  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Found no committed offset for partition transform-in-0-0
2021-01-19 08:13:31.977  INFO 107 --- [container-0-C-1] o.a.k.c.c.internals.SubscriptionState    : [Consumer clientId=consumer-anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3-2, groupId=anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3] Resetting offset for partition transform-in-0-0 to offset 0.
2021-01-19 08:13:32.007  INFO 107 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder$2  : anonymous.899f643d-9e17-4354-824c-96e77ec4c5b3: partitions assigned: [transform-in-0-0]

Solution

  • I managed to get it work ! The Kotlin lambda detection problem is a problem with the 2020.0.0 release. it works with the Hoxton.SR9

    The anonymous consumer problems (in the Edit) was resolved by adding these two lines in the application.properies

    spring.cloud.stream.function.bindings.transform-in-0=input
    spring.cloud.stream.function.bindings.transform-out-0=output