Search code examples
elasticsearchapache-kafkareactive-programmingproducer-consumerreactor-kafka

Can Reactive Kafka Receiver work with non-reactive Elasticsearch client?


Below is a sample code which uses reactor-kafka and reads data from a topic (with retry logic) which has records published via a non-reactive producer. Inside my doOnNext() consumer I am using non-reactive elasticsearch client which indexes the record in the index. So I have few questions that I am still unclear about :

  1. I know that consumers and producers are independent decoupled systems, but is it recommended to have reactive producer as well whose consumers are reactive?
  2. If I am using something that is non-reactive, in this case Elasticsearch client org.elasticsearch.client.RestClient, does the "reactiveness" of the code work? If it does or does not, how do I test it? (By "reactiveness", I mean non blocking IO part of it i.e. if I spawn three reactive-consumers and one is latent for some reason, the thread should be unblocked and used for other reactive consumer).
  3. In general the question is, if I wrap some API with reactive clients should the API be reactive as well?

public Disposable consumeRecords() {
    long maxAttempts = 3, duration = 10;
    RetryBackoffSpec retrySpec = Retry.backoff(maxAttempts, Duration.ofSeconds(duration)).transientErrors(true);
    Consumer<ReceiverRecord<K, V>> doOnNextConsumer = x -> {
        // use non-reactive elastic search client and index record x
    };

    return KafkaReceiver.create(receiverOptions)
            .receive()
            .doOnNext(record -> {
                try {
                    // calling the non-reactive consumer
                    doOnNextConsumer.accept(record);
                } catch (Exception e) {
                    throw new ReceiverRecordException(record, e);
                }
                record.receiverOffset().acknowledge();
            })
            .doOnError(t -> log.error("Error occurred: ", t))
            .retryWhen(retrySpec)
            .onErrorContinue((e, record) -> {
                ReceiverRecordException receiverRecordException = (ReceiverRecordException) e;
                log.error("Retries exhausted for: " + receiverRecordException);
                receiverRecordException.getRecord().receiverOffset().acknowledge();
            })
            .repeat()
            .subscribe();
}

Solution

  • Got some understanding around it.

    Reactive KafkaReceiver will internally call some API; if that API is blocking API then even if KafkaReceiver is "reactive" the non-blocking IO will not work and the receiver thread will be blocked because you are calling Blocking API / non-reactive API.

    You can test this out by creating a simple server (which blocks calls for sometime / sleep) and calling that server from this receiver