I've configured my consumer to accept messages from a topic in batches. How do I forward it to a new topic?
I want each consumed messages to be forwarded as it's own message. So X amount of messages consumed will produce X amount of messages.
Here's my current setup:
@KafkaListener(topics = "input")
@SendTo("output")
public ConsumerRecords consume(ConsumerRecords records) {
// Do things
return records;
}
And here's the exception thrown:
org.springframework.kafka.KafkaException: No method found for class java.util.ArrayList
That functionality is not supported. In any case, you can't send a ConsumerRecord
to a Producer
.
This works, though
@KafkaListener(id = "foo", topics = "input")
@SendTo("output")
public List<String> consume(List<String> data) {
return data;
}
(where String
is the type created by your deserializer).