Search code examples
apache-kafkaspring-webfluxproject-reactorreactorreactor-kafka

Kafka reactor - How to disable KAFKA consumer being autostarted?


Below is my KAFKA consumer

@Bean("kafkaConfluentInboundReceiver")
@ConditionalOnProperty(value = "com.demo.kafka.core.inbound.confluent.topic-name",
        matchIfMissing = false)
public KafkaReceiver<String, Object> kafkaInboundReceiver() {
    ReceiverOptions<String, Object> receiverOptions = ReceiverOptions.create(inboundConsumerConfigs());
    receiverOptions.schedulerSupplier(() -> Schedulers
            .fromExecutorService(applicationContext.getBean("inboundKafkaExecutorService", ExecutorService.class)));
    receiverOptions.maxCommitAttempts(kafkaProperties.getKafka().getCore().getMaxCommitAttempts());
    return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
            .subscription(Collections.singleton(
                    kafkaProperties.getKafka()
                            .getCore().getInbound().getConfluent()
                            .getTopicName()))
            .commitInterval(Duration.ZERO).commitBatchSize(0));
}

My KAFKA consumer is getting started automatically. However I want to disable KAFKA consumer being autostarted.

I got to know that, In spring KAFKA we can do something like this

factory.setAutoStartup(start);

however, I am not sure how I achieve(control auto start/stop behavior) in Kafka reactor. I want to have something like below

Introducing a property to handle the auto start/stop behavior

@Value("${consumer.autostart:true}")
private boolean start;

using the above property I should be able to set the KAFKA Auto-Start flag in Kafka reactor, something like this

return KafkaReceiver.create(receiverOptions.addAssignListener(Collection::iterator)
        .subscription(Collections.singleton(
                kafkaProperties.getKafka()
                        .getCore().getInbound().getConfluent()
                        .getTopicName()))
        .commitInterval(Duration.ZERO).commitBatchSize(0)).setAutoStart(start);

Note: .setAutoStart(start);

Is this doable in Kafka reactor, if so, how do I do it?

Update:

protected void inboundEventHubListener(String topicName, List<String> allowedValues) {
    Scheduler scheduler = Schedulers.fromExecutorService(kafkaExecutorService);
    kafkaEventHubInboundReceiver
            .receive()
            .publishOn(scheduler)
            .groupBy(receiverRecord -> {
                try {
                    return receiverRecord.receiverOffset().topicPartition();
                } catch (Throwable throwable) {
                    log.error("exception in groupby", throwable);
                    return Flux.empty();
                }
            }).flatMap(partitionFlux -> partitionFlux.publishOn(scheduler)
            .map(record -> {
                processMessage(record, topicName, allowedValues).block(
                        Duration.ofSeconds(60L));//This subscribe is to trigger processing of a message
                return record;
            }).concatMap(message -> {
                log.info("Received message after processing offset: {} partition: {} ",
                         message.offset(), message.partition());
                return message.receiverOffset()
                        .commit()
                        .onErrorContinue((t, o) -> log.error(
                                String.format("exception raised while commit offset %s", o), t)
                        );
            })).onErrorContinue((t, o) -> {
        try {
            if (null != o) {
                ReceiverRecord<String, Object> record = (ReceiverRecord<String, Object>) o;
                ReceiverOffset offset = record.receiverOffset();
                log.debug("failed to process message: {} partition: {} and message: {} ",
                          offset.offset(), record.partition(), record.value());
            }
            log.error(String.format("exception raised while processing message %s", o), t);
        } catch (Throwable inner) {
            log.error("encountered error in onErrorContinue", inner);
        }
    }).subscribeOn(scheduler).subscribe();

Can I do something like this?

kafkaEventHubInboundReceiverObj = kafkaEventHubInboundReceiver.....subscribeOn(scheduler);
if(consumer.autostart) {
kafkaEventHubInboundReceiverObj.subscribe();
}

Solution

  • With reactor-kafka there is no concept of "auto start"; you are in complete control.

    The consumer is not "started" until you subscribe to the Flux returned from receiver.receive().

    Simply delay the flux.subscribe() until you are ready to consume data.