I have a springboot app that sends kafka events to a topic with a specific avro schema. The demo code is as so:
@Configuration
@Slf4j
public class KafkaListener {
@Bean
public Function<List<DummyClass>, List<DummyClass>> acceptEvent() {
return messages -> {
List<DummyClass> output = new ArrayList<>();
messages.forEach(msg -> {
// do work and add items to the output field
});
// some more work
return output;
};
}
}
And DummyClass
class is a class generated by avro schema (.avsc file being defined in the resources folder
and generated by the avro plugin).
So a fairly standard set up. Now, instead of sending events to a single topic like I do now with the code above, I want to be able to send events to 2 topics.
After searching online, I found a spring documentation illustrating an example using tuples. So I gave it a go and implemented it all but the flux part (as I don't want it). The new code end up being as so:
@Configuration
@Slf4j
public class KafkaListener {
@Bean
public Function<List<DummyClass>, Tuple2<List<DummyClass>,List<DummyClass>>> acceptEvent() {
return messages -> {
List<DummyClass> output1 = new ArrayList<>();
List<DummyClass> output2 = new ArrayList<>();
messages.forEach(msg -> {
// do work and add items to the output1 and output2 fields
});
// some more work
return Tuples.of(output1,output2);
};
}
}
However, that does not seem to work as I am getting the following exception:
java.lang.UnsupportedOperationException: At the moment only Tuple-based function are supporting multiple arguments
I suspect that this is due to me not using flux. This answer suggested that wrong imports might be in use but I verified that everything that I use is from correct imports.
A slightly different implementation has encountered a similar problem as specified in this article. However, this person managed to resolve the issue by passing a custom object containing output arguments as fields. I have attempted to do the same but running the app results in the following exception:
Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
And with that, I have exhausted my options for now. My question is then as follows: How can I achieve writing events to multiple topics? And if one of my implementations is the way, what did I do wrong? Is there other things to try? - I have seen KStream which said to be able to do what I want but I cannot seem to find a good code example.
After some back and forth, I found the streamBridge being the best option to achieve that.
@Configuration
@Slf4j
@AllArgsContstractor
public class KafkaListener {
private final StreamBridge streamBridge;
@Bean
public Function<List<DummyClass>, List<DummyClass>> acceptEvent() {
return messages -> {
List<DummyClass> output = new ArrayList<>();
messages.forEach(msg -> {
// do work and add items to the output field you want to be sent to topic 1
if(condition) {
Message<DummyClass> msgToSendViaStreamBridge = createMsg(); // method to create an event for streamBridge to send to the topic 2
streamBridge.send("acceptEvnet-out-1", msgToSendViaStreamBridge);
}
});
// some more work
return output;
};
}
}
So now my Function
will send events to existing topic (defined by acceptEvnet-out-0
binding in application.yml
file) that would be put into the output
list. And we send events to a different topic defined in acceptEvnet-out-1
binding. This can be used to send events to as many topics as you want.