I got 2 applications and 3 Kafka topics. The first application produces messages into the pos-topic
. This application is working and the messages are written into the topic. In the other application (which is not working) i got 2 functions listening to that topic :
hadoop:
@Bean
fun hadoop(): Function<KStream<String, PosInvoice>, KStream<String, HadoopRecord>> {
return Function { input: KStream<String, PosInvoice> ->
input.mapValues { v -> recordBuilder.getMaskedInvoice(v) }
.flatMapValues { v -> recordBuilder.getHadoopRecords(v) }
}
}
notification:
@Bean
fun notification(): Function<KStream<String, PosInvoice>, KStream<String, Notification>> {
return Function { input -> input.filter { _, v -> v.customerType.equals("PRIME", ignoreCase = true) }
.mapValues { v -> recordBuilder.getNotification(v) }}
}
The hadoop-function schould produce to the hadoop-sink-topic
and the notification-function should produce to the notification-topic
. Both functions are within a @Configuration
class.
The main class looks like this:
@SpringBootApplication
class JsonposfanoutApplication
fun main(args: Array<String>) {
runApplication<JsonposfanoutApplication>(*args)
}
And here is the application.yml :
spring:
cloud:
stream:
bindings:
notification-in-0.destination: pos-topic
notification-out-0.destination: loyalty-topic
hadoop-in-0.destination: pos-topic
hadoop-out-0.destination: hadoop-sink-topic
function:
definition: notification;hadoop
kafka:
streams:
binder:
brokers: localhost:9092
configuration:
schema.registry.url: http://localhost:8081
bindings:
notification-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
hadoop-out-0.producer.valueSerde: io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
The classes that are getting deserialized and serialized are Json-friendly. The first Application writes a Json-serialized value into the topic. So i do not need to explicit define my consumer.value.serdes
and my key.serdes
with Spring Cloud Streams right?
Whats going wrong?
I have no idea. I get no error or exception. The Application is just starting and finished after a few seconds. I have no info about my processors, so i think they do not even start.
All suspicious i got from Spring Boot is an Info:
@Bean method FunctionConfiguration.po is non-static and returns an object assignable to Spring's BeanFactoryPostProcessor interface. This will result in a failure to process annotations such as @Autowired, @Resource and @PostConstruct within the method's declaring @Configuration class. Add the 'static' modifier to this method to avoid these container lifecycle issues; see @Bean javadoc for complete details.
What did I try?
I've tried alot in my application.yml but nothing helped and i reverted everything to the first version. Maybe i'm missing something elsewhere?
Changing org.springframework.cloud:spring-cloud-stream-binder-kafka-streams
(version 3.1.2) to org.springframework.cloud:spring-cloud-stream-binder-kafka-streams:3.1.3-SNAPSHOT
solved my problem. For some reason i did not got this exception:
org.springframework.context.ApplicationContextException: Failed to start bean 'streamsBuilderFactoryManager'; nested exception is org.springframework.kafka.KafkaException: Could not start stream: ; nested exception is java.lang.IllegalArgumentException: 'listener' cannot be null
in my application but in another one.
Here is the answer that helped then: https://stackoverflow.com/a/66888771/15359624