Search code examples
spring-bootkotlinapache-kafkaapache-kafka-streamsspring-cloud-stream

Apache Kafka Streams Application with 2 functions not working


I got 2 applications and 3 Kafka topics. The first application produces messages into the pos-topic. This application is working and the messages are written into the topic. In the other application (which is not working) i got 2 functions listening to that topic :

hadoop:

@Bean
fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>> {
    return Function { input: KStream<String, PosInvoice> ->
        input.mapValues { v -> recordBuilder.getMaskedInvoice(v) }
            .flatMapValues { v -> recordBuilder.getHadoopRecords(v) }
    }
}

notification:

@Bean
fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>> {
    return Function { input -> input.filter { _, v -> v.customerType.equals("PRIME", ignoreCase = true) }
        .mapValues { v -> recordBuilder.getNotification(v) }}
}

The hadoop-function schould produce to the hadoop-sink-topic and the notification-function should produce to the notification-topic. Both functions are within a @Configuration class.

The main class looks like this:

@SpringBootApplication
class JsonposfanoutApplication
fun main(args: Array<String>) {
   runApplication<JsonposfanoutApplication>(*args)
}

And here is the application.yml :

spring:
  cloud:
    stream:
      bindings:
        notification-in-0.destination: pos-topic
        notification-out-0.destination: loyalty-topic
        hadoop-in-0.destination: pos-topic
        hadoop-out-0.destination: hadoop-sink-topic
      function:
        definition: notification;hadoop
      kafka:
        streams:
          binder:
            brokers: localhost:9092
            configuration:
              schema.registry.url: http://localhost:8081
          bindings:
            notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
            hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde

The classes that are getting deserialized and serialized are Json-friendly. The first Application writes a Json-serialized value into the topic. So i do not need to explicit define my consumer.value.serdes and my key.serdes with Spring Cloud Streams right?

Whats going wrong?

I have no idea. I get no error or exception. The Application is just starting and finished after a few seconds. I have no info about my processors, so i think they do not even start. All suspicious i got from Spring Boot is an Info: @Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.

What did I try?

I've tried alot in my application.yml but nothing helped and i reverted everything to the first version. Maybe i'm missing something elsewhere?


Solution

  • Changing org.springframework.cloud:spring-cloud-stream-binder-kafka-streams (version 3.1.2) to org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT solved my problem. For some reason i did not got this exception: org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null in my application but in another one. Here is the answer that helped then: https://stackoverflow.com/a/66888771/15359624