I am working on a project using reactive Kafka that consume messages from a Kafka topic and posts the messages in batches to a REST endpoint. I am stuck on the batching part and sending that batch to endpoint. I need to read N messages (N here is configurable) from the topic and then sent that N messages to a REST endpoint. How can I read N messages using reactor Kafka? I have looked at the examples in https://projectreactor.io/docs/kafka/release/reference/#_overview but couldn't find an example that is similar to my problem. Any pointers on solving this will be really helpful.
Here is the code I have so far to read consume messages from a topic
@Slf4j
@Service
public class Service implements CommandLineRunner {
@Autowired
@Qualifier("KafkaConsumerTemplate")
public ReactiveKafkaConsumerTemplate<String, String> KafkaConsumerTemplate;
public Flux<String> consume() {
return KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
)
.map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
}
@Override
public void run(String... args) throws Exception {
consume().subscribe();
}
}
So you can use #buffer(int) operation for such purposes.
For your specific case:
int bufferSize = 10;
return KafkaConsumerTemplate.receiveAutoAck()
.doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
consumerRecord.key(),
consumerRecord.value(),
consumerRecord.topic(),
consumerRecord.offset())
).map(ConsumerRecord::value)
.doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
.buffer(bufferSize)
.doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
@Override
public void run(String... args) throws Exception {
consume().subscribe(it -> {
//it is a List of batched entities, here you can do whatever you want with your data.
});
}