Search code examples
javaspring-bootspring-kafka

Spring stream kafka events with multiple output topics


I have a springboot app that sends kafka events to a topic with a specific avro schema. The demo code is as so:

@Configuration
@Slf4j
public class KafkaListener {

  @Bean
  public Function<List<DummyClass>, List<DummyClass>> acceptEvent() {
     return messages -> {
        List<DummyClass> output = new ArrayList<>();
        messages.forEach(msg -> {
          // do work and add items to the output field
        });
        // some more work
        return output;
     };
  }
}

And DummyClass class is a class generated by avro schema (.avsc file being defined in the resources folder and generated by the avro plugin).

So a fairly standard set up. Now, instead of sending events to a single topic like I do now with the code above, I want to be able to send events to 2 topics.

After searching online, I found a spring documentation illustrating an example using tuples. So I gave it a go and implemented it all but the flux part (as I don't want it). The new code end up being as so:

@Configuration
@Slf4j
public class KafkaListener {

  @Bean
  public Function<List<DummyClass>, Tuple2<List<DummyClass>,List<DummyClass>>> acceptEvent() {
     return messages -> {
        List<DummyClass> output1 = new ArrayList<>();
        List<DummyClass> output2 = new ArrayList<>();
        messages.forEach(msg -> {
          // do work and add items to the output1 and output2 fields
        });
        // some more work
        return Tuples.of(output1,output2);
     };
  }
}

However, that does not seem to work as I am getting the following exception:

java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments

I suspect that this is due to me not using flux. This answer suggested that wrong imports might be in use but I verified that everything that I use is from correct imports.

A slightly different implementation has encountered a similar problem as specified in this article. However, this person managed to resolve the issue by passing a custom object containing output arguments as fields. I have attempted to do the same but running the app results in the following exception:

Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord

And with that, I have exhausted my options for now. My question is then as follows: How can I achieve writing events to multiple topics? And if one of my implementations is the way, what did I do wrong? Is there other things to try? - I have seen KStream which said to be able to do what I want but I cannot seem to find a good code example.


Solution

  • After some back and forth, I found the streamBridge being the best option to achieve that.

    @Configuration
    @Slf4j
    @AllArgsContstractor
    public class KafkaListener {
    
      private final StreamBridge streamBridge;
    
      @Bean
      public Function<List<DummyClass>, List<DummyClass>> acceptEvent() {
         return messages -> {
            List<DummyClass> output = new ArrayList<>();
            messages.forEach(msg -> {
              // do work and add items to the output field you want to be sent to topic 1
              if(condition) {
                Message<DummyClass> msgToSendViaStreamBridge = createMsg(); // method to create an event for streamBridge to send to the topic 2
                streamBridge.send("acceptEvnet-out-1", msgToSendViaStreamBridge);
              }
            });
            // some more work
            return output;
         };
      }
    }
    

    So now my Function will send events to existing topic (defined by acceptEvnet-out-0 binding in application.yml file) that would be put into the output list. And we send events to a different topic defined in acceptEvnet-out-1 binding. This can be used to send events to as many topics as you want.