Search code examples
javaapache-kafkareactive-programmingproject-reactorreactor-kafka

Concurrent processing with partition based ordering in Reactor Kafka


I am working on a sample application that will read from different partitions of a Kafka topic, concurrently process the records that are ordered based on partition and write the records to different partitions of another topic. This is the sample code I wrote

    public class MetricsTransposer {
    private static final String BOOTSTRAP_SERVERS = "localhost:9092";

    static abstract class SetKafkaProperties {
        final String SOURCE_TOPIC;
        final String DESTINATION_TOPIC;
        final Map<String, Object> consumerProps;
        final Map<String, Object> producerProps;

        SetKafkaProperties(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            SOURCE_TOPIC = sourceTopic;
            DESTINATION_TOPIC = destTopic;

            consumerProps = new HashMap<String, Object>();
            consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "reactive-group-" + System.currentTimeMillis());
            consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
            consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
            consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            consumerProps.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, "0"); 
            if(consumerPropsOverride != null) {
                consumerProps.putAll(consumerPropsOverride);
            }

            producerProps = new HashMap<String, Object>();
            producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
            producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0");
            producerProps.put(ProducerConfig.MAX_BLOCK_MS_CONFIG, String.valueOf(Long.MAX_VALUE));
            producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
            producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringDeserializer.class);
            if(producerPropsOverride != null) {
                producerProps.putAll(producerPropsOverride);
            }
        }
    }

    static class ReactiveTranspose extends SetKafkaProperties {

        SenderOptions<Integer, String> senderOptions =
            SenderOptions.<Integer, String>create(producerProps)
                .maxInFlight(1024);

        KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);

        ReceiverOptions<Integer, String> receiverOptions =
            ReceiverOptions.<Integer, String>create(consumerProps)
                .subscription(Collections.singleton(SOURCE_TOPIC));


        ReactiveTranspose(Map<String, Object> consumerPropsOverride, Map<String, Object> producerPropsOverride, String bootstrapServers, String sourceTopic, String destTopic) {
            super(consumerPropsOverride, producerPropsOverride, bootstrapServers, sourceTopic, destTopic);
        }

        public Disposable ReadProcessWriteRecords() {
            Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
           return KafkaReceiver.create(receiverOptions)
                .receive()
                .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                .groupBy(m -> m.receiverOffset().topicPartition())
                .flatMap(partitionFlux ->
                    partitionFlux.publishOn(scheduler)
                        .map(r -> processRecord(partitionFlux.key(), r))
                        .sample(Duration.ofMillis(5000))
                        .concatMap(offset -> offset.commit()))
                .subscribe();
        }

        private ReceiverOffset processRecord(TopicPartition topicPartition, ReceiverRecord<Integer, String> message) {
            System.out.printf("Processing record {} from partition {} in thread{}",
                message.value(), topicPartition, Thread.currentThread().getName());
            return message.receiverOffset();
        }
    }

    public static void RunReactiveTranformProcess(String sourceTopic, String destinationTopic) {
        ReactiveTranspose transpose = new ReactiveTranspose(null, null, BOOTSTRAP_SERVERS, sourceTopic, destinationTopic);
        transpose.ReadProcessWriteRecords();
    }

    public static void main(String[] args) throws Exception {
        String sourceTopic = "metrics";
        String destinationTopic = "cleanmetrics";

        RunReactiveTranformProcess(sourceTopic, destinationTopic);

    }
}

When I run the application, I am not seeing the print statements in the console. I do have data to be consumed in the topic. So I wonder if the code is connecting to the topic at all. I am looking for help in figuring out how I can check if it is connecting to the topic and reading the messages or what could be the issue here.

I am a newbie to Java, reactive programming and Kafka. This is a self learning project, it is quite possible I am missing something simple and obvious.

More information: Here is a snapsht of my logs. I have a topic named metrics with 3 partitions enter image description here

Update: I wasn't seeing my print statements because I have data in my topic but auto.offset.reset was set to latest. Changing that to earliest consumed the existing data.


Solution

  • Your problem is here:

    public void ReadProcessWriteRecords() {
        Scheduler scheduler = Schedulers.newBoundedElastic(60, 60, "writerThreads");
    
        // Here you are ignoring the return
        // Nothing happens until you subscribe
        // So this is merly a statement not a execution.
        KafkaReceiver.create(receiverOptions)
                     .receive()
                     .doOnNext( r -> System.out.printf("Record received: {0}", r.value()))
                     .groupBy(m -> m.receiverOffset().topicPartition())
                     .flatMap(partitionFlux ->
                         partitionFlux.publishOn(scheduler)
                             .map(r -> processRecord(partitionFlux.key(), r))
                             .sample(Duration.ofMillis(5000))
                             .concatMap(offset -> offset.commit()));
    }
    

    The reactive documentation covers this in the getting started section that nothing happens until you subscribe. In the above code you are creating a reactive flow, but then no one ever subscribes to it.

    Since your application is the consumer of the stream you should add a subscribe statement somewhere.

    I personaly wouldn't return void (you usually try to avoid void functions in reactive programming because these usually cause side effects and are hard to test), i would return the producer all the way to the main function so that the code could be unit tested.

    So that the resulting main function would look like such.

    public static void main(String[] args) throws Exception {
        String sourceTopic = "metrics";
        String destinationTopic = "cleanmetrics";
    
        RunReactiveTranformProcess(sourceTopic, destinationTopic).subscribe();
    
    }