Search code examples
kotlinapache-kafkaapache-kafka-streamsspring-kafka

Spring kafka - kafka streams topology set up


We have a kafka streams spring boot app(using spring-kafka), this app currently reads messages from an upstream topic applies some transformation, and writes them to a downstream topic, it does not do any aggregates or joins or any advanced kafka streams feature.

The code currently looks similar to this

@Bean 
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SomeObject> {
  val stream = streamsBuilder.stream<String, SomeObject>(inputTopicName)
  val branches: Array<KStream<String, SomeObject>> = stream.branch(
    { _, value -> isValidRawData(value)},
    { _, failedValue -> true}
  )
        
  branches[0].map { _, value -> transform(value) }.to(outputTopicName)
  branches[1].foreach { _, value -> s3Service.uploadEvent(value) }
}

This is working fine for use but now we need to extend this code to consume messages of a different schema from a second upstream topic and apply a slightly different transformation and then write them to the same downstream topic(with a similar schema) as the topology above.

In order to achieve this we have 2 options;

  1. Create a second @Bean factory method almost similar to the one above except that its topology consumes from a separate topic and applies a different transformation.

  2. Modify to the topology above to consume both topics, create a third branch for messages from the second topic as follows

@Bean 
fun topology(streamsBuilder: StreamsBuilder): KStream<String, SpecificRecord> {
  val topics = listOf("topic1", "topic2")
  val stream = streamsBuilder.stream<String, SpecificRecord>(topics)
  val branches: Array<KStream<String,SpecificRecord>> = stream.branch(
    { _, value -> isRecordFromTopic1(value)},
    { _, value -> isRecordFromTopic2(value)},
    { _, failedValue -> true}
  )
        
  branches[0].map { _, value -> transformTopic1Record(value) }.to(outputTopicName)
  branches[1].map { _, value -> transformTopic2Record(value) }.to(outputTopicName)
  branches[2].foreach { _, value -> s3Service.uploadEvent(value) }
}

Which one of these approaches would be the recommended one? Are there things we need to consider from a kafka streams resource management or performance perspective?

Thanks for you suggestions.


Solution

  • Since there is that collection of topics API as you show in the second code, I would say both variant are valid and makes sense. Everything else is just a personal preference. I would go the first one since technically in the end everything is going to work on the same Streams engine. The first solution is much easier to support in the future when you would introduce a third record type and so on. Or you may have extra logic for the specific stream. You may have a common stream to read from all the topics and distribute them via that condition and branches. The rest of logic you may do in their individual stream via their own intermediate topics. But still: just my opinion...