Search code examples
javaapache-kafkakafka-consumer-apijaasconfluent-cloud

Kafka Consumer empty records


There are a lot of questions about this topic, however, this is NOT a duplicate question!

The problem I'm facing is that I tried to set up a SpringBoot project with Java 14 and Kafka 2.5.0 and my Consumer returns an empty list of records. Most answers here indicate some forgotten properties, to poll frequently or to set the offset mode to earliest.

I can't see any logical difference from docs.confluent.io, eventhough my config-settings seem unconventional (see my setting of the jaas.conf in the snippet below).

@EnableKafka
@Configuration
public class KafkaConfig {

    @Bean
    public KafkaConsumer<Long, MyClass> consumerConfigs() {
        Properties config = new Properties();

        config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
        config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");

        config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
        config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
        config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
        config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);

        System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");

        return new KafkaConsumer<>(config);
    }
}

This works however. I receive no exceptions (Kafka, or otherwise), and the connection is made.

// jaas.conf-file
KafkaClient {
org.apache.kafka.common.security.plain.PlainLoginModule required
serviceName="serviceName"
username="username"
password="password";
};

Here is where I'm actually polling:

try {
            KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
            consumer.subscribe(Collections.singletonList(inputTopic));

            int count = 0;
            Long start = System.currentTimeMillis();
            Long end = System.currentTimeMillis();

            while (end - start < 900_000) { 
                // boolean would be set to true in production
                ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                records.forEach(record -> {
                    MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                    System.out.println(result);
                });
               
                consumer.commitSync();

                System.out.println("visualize number of loops made: " + ++count);
                end = System.currentTimeMillis();
            }
        } catch (KafkaException e) {
            e.printStackTrace();
        } catch (Exception e) {
            System.out.println(e.getMessage());
        }

I added the prints and other clutter in order to try and find the issue. I run my program in debug-mode and placed the breakpoint on this line:

MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());

As a result, I see a printed line with the count every second, as one would expect. But since my Consumer returns no records, it never enters the forEach and thus never triggers my breakpoint.

I can definitely see my topic in the cloud, with two partitions. Messages are generated in a steady stream, so I know I should be able to pick up something.

I know that it would take some time to connect to the cluster, but with current time being set at a quarter hour, I should receive at least something, right? As an alternative I tried switching the consumer.subscribe() to the consumer.assign() method were I specified my TopicPartition, setting the consumer to consumer.seekToBeginning(). It ran fine, but also returned nothing.

One other thing that's not found in the most common examples, is that I use my own classes. So instead of KafkaConsumer<String, String>, I implemented custom (de)serializers according to this tutorial.

Could it be my configuration settings? Something wrong with the poll timeout? The (de)serialization, or something else entirely? I genuinely can't pinpoint any reason as to why I'm getting zero records. Any feedback would be greatly appreciated!


Solution

  • Problem solved. It wasn't anything you could determine from my posted question, nevertheless, I want to clarify a few things, should someone else find himself stuck with similair configurations.

    1. verify that the received password is indeed the correct one. Facepalm

    It was so, that I thought he was making a connection to the cluster, but instead my loop kept printing the count because the .poll(Duration.ofMillis(1000)) method is executed -> checks if he can connect within the given timeout -> moves on returning zero records if the connection failed. No error is thrown. Normally, after 2 seconds or so, a connection should've been made.

    1. check your connection to the database.

    You never want the application to stop, which is why I designed myOwnKafkaService.getSomethingFromRecord(record.key(), record.value()) method to log all errors, but all exceptions are caught. It wasn't until I checked the logs that I realized my permissions to access the remote database weren't in order.

    1. what's known as a TimeStamp, should be deserialized into java.util.Date

    Wrongfully parsed it throws an exception, but my method returned null. As all remarks in this answer, this one also boils down to being inexperienced in such setups. You'll find the rectified classes below to serve as a working example (but not at all entirely best practice).

    the KafkaConfig:

    @EnableKafka
    @Configuration
    public class KafkaConfig {
    
        @Bean
        public KafkaConsumer<Long, MyClass> consumerConfigs() {
            Properties config = new Properties();
    
            config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
            config.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
    
            config.put(ConsumerConfig.CLIENT_ID_CONFIG, "serviceName");
            config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "confluent-cloud");
            config.put(ConsumerConfig.GROUP_ID_CONFIG, "serviceName");
            config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, LongDeserializer.class);
            config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, MyClass.class);
            config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            config.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, 100_000);
            config.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300_000);
            config.put(ConsumerConfig.DEFAULT_API_TIMEOUT_MS_CONFIG, 300_000);
    
            System.setProperty("java.security.auth.login.config", ".\\src\\main\\resources\\jaas.conf");
    
            return new KafkaConsumer<>(config);
        }
    }
    

    body of the polling method:

                KafkaConsumer<Long, MyClass> consumer = kafkaConfig.consumerConfigs();
                consumer.subscribe(Collections.singletonList(inputTopic));
    
                while (true) {
                    ConsumerRecords<Long, MyClass> records = consumer.poll(Duration.ofMillis(1000));
                    records.forEach(record -> {
                        MyOtherClass result = myOwnKafkaService.getSomethingFromRecord(record.key(), record.value());
                        System.out.println(result);
                    });
                    consumer.commitSync();
                }
    

    small example of MyClass with Deserializer:

    @Data
    @Slf4J
    public class MyClass implements Deserializer<MyClass> {
    
        @JsonProperty("UNIQUE_KEY")
        private Long uniqueKey;
        @JsonProperty("EVENT_TIMESTAMP")
        @JsonFormat(shape = JsonFormat.Shape.STRING, pattern = "yyyy-MM-dd'T'HH:mm:ss.SSS")
        private Date eventTimestamp;
        @JsonProperty("SOME_OTHER_FIELD")
        private String someOtherField;
    
    @Override
        public MyClass deserialize(String s, byte[] bytes) {
            ObjectMapper mapper = new ObjectMapper();
            MyClass event = null;
            try {
                event = mapper
                        .registerModule(new JavaTimeModule())
                        .readValue(bytes, MyClass.class);
            } catch (Exception e) {
                log.error("Something went wrong during the deserialization of the MyClass: {}", e.getMessage());
            }
            return event;
        }
    }
    

    I hope this serves someone else in the future. I learned quite a bit from my setbacks and mistakes.