Search code examples
apache-kafkaspring-webfluxspring-kafkaproject-reactorreactor-kafka

Read from topic and write the messages in batches to a REST endpoint using reactor Kafka


I am working on a project using reactive Kafka that consume messages from a Kafka topic and posts the messages in batches to a REST endpoint. I am stuck on the batching part and sending that batch to endpoint. I need to read N messages (N here is configurable) from the topic and then sent that N messages to a REST endpoint. How can I read N messages using reactor Kafka? I have looked at the examples in https://projectreactor.io/docs/kafka/release/reference/#_overview but couldn't find an example that is similar to my problem. Any pointers on solving this will be really helpful.

Here is the code I have so far to read consume messages from a topic

@Slf4j
@Service
public class Service implements CommandLineRunner {
    @Autowired
    @Qualifier("KafkaConsumerTemplate")
    public ReactiveKafkaConsumerTemplate<String, String> KafkaConsumerTemplate;


    public Flux<String> consume() {
        return KafkaConsumerTemplate.receiveAutoAck()
                .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                        consumerRecord.key(),
                        consumerRecord.value(),
                        consumerRecord.topic(),
                        consumerRecord.offset())
                )
                .map(ConsumerRecord::value)
                .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));

    }


    @Override
    public void run(String... args) throws Exception {
        consume().subscribe();
    }
}

Solution

  • So you can use #buffer(int) operation for such purposes.

    For your specific case:

    int bufferSize = 10;
    return KafkaConsumerTemplate.receiveAutoAck()
              .doOnNext(consumerRecord -> log.info("received key={}, value={} from topic={}, offset={}",
                  consumerRecord.key(),
                  consumerRecord.value(),
                  consumerRecord.topic(),
                  consumerRecord.offset())
               ).map(ConsumerRecord::value)
                .doOnNext(metric -> log.debug("successfully consumed {}={}", Metric[].class.getSimpleName(), Metric))
                .buffer(bufferSize)
                .doOnError(throwable -> log.error("Error while consuming : {}", throwable.getMessage()));
    
    @Override
    public void run(String... args) throws Exception {
       consume().subscribe(it -> {
          //it is a List of batched entities, here you can do whatever you want with your data.
       });
    }