Apache Kafka Streams Application with 2 functions not working

I got 2 applications and 3 Kafka topics. The first application produces messages into the pos-topic. This application is working and the messages are written into the topic. In the other application (which is not working) i got 2 functions listening to that topic :


fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>> {
    return Function { input: KStream<String, PosInvoice> ->
        input.mapValues { v -> recordBuilder.getMaskedInvoice(v) }
            .flatMapValues { v -> recordBuilder.getHadoopRecords(v) }


fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>> {
    return Function { input -> input.filter { _, v -> v.customerType.equals("PRIME", ignoreCase = true) }
        .mapValues { v -> recordBuilder.getNotification(v) }}

The hadoop-function schould produce to the hadoop-sink-topic and the notification-function should produce to the notification-topic. Both functions are within a @Configuration class.

The main class looks like this:

class JsonposfanoutApplication
fun main(args: Array<String>) {

And here is the application.yml :

        notification-in-0.destination: pos-topic
        notification-out-0.destination: loyalty-topic
        hadoop-in-0.destination: pos-topic
        hadoop-out-0.destination: hadoop-sink-topic
        definition: notification;hadoop
            brokers: localhost:9092
              schema.registry.url: http://localhost:8081
            notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
            hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde

The classes that are getting deserialized and serialized are Json-friendly. The first Application writes a Json-serialized value into the topic. So i do not need to explicit define my consumer.value.serdes and my key.serdes with Spring Cloud Streams right?

Whats going wrong?

I have no idea. I get no error or exception. The Application is just starting and finished after a few seconds. I have no info about my processors, so i think they do not even start. All suspicious i got from Spring Boot is an Info: @Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.

What did I try?

I've tried alot in my application.yml but nothing helped and i reverted everything to the first version. Maybe i'm missing something elsewhere?


