Search code examples
apache-kafkakafka-consumer-apiapache-kafka-streamsavroconfluent-schema-registry

KafkaStream cannot stream data from topic using Avro


I am building an application to get gain hands-on experience with KafkaStreams, which I have been successful with so far. However, when I specifically try to build a Stream to a topic that is serialized using Avro, it fails.

The consumer group ID is registered in the cluster, however it is not subscribed to a topic. As seen below on the image.

First column displaying its number of consumer and second being the number of topics

enter image description here

The code below is my configuration of the Kafka Stream.

    public static void main(String[] args) {
        //Defining the properties for the stream
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "Test-stream-UserRegistrationServicebbb");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        //Defining the serde for the value
        Map<String, String> serdeConfig = Collections.singletonMap("schema.registry.url", "http://localhost:8081");
        SpecificAvroSerde<Pending_Registrations> pendingRegistrationsSerde = new SpecificAvroSerde<>();
        pendingRegistrationsSerde.configure(serdeConfig, false);

        StreamsBuilder builder = new StreamsBuilder();
        //Creating a stream from the topic with specific serde
        KStream<String, Pending_Registrations> userStream = builder.stream("User.Pending-Registrations",
                Consumed.with(Serdes.String(), pendingRegistrationsSerde));
        //Printing the stream
        userStream.foreach((key, value) -> System.out.println("key: " + key + " value: " + value));
        //starting the stream
        KafkaStreams streams = new KafkaStreams(builder.build(), props);
        streams.start();
    }

The Pending_Registration class is generated from an Avro schema (specifically from a .avsc file).

All services except the java applications are deployed using docker on ports: zookeeper: 2181, kafka: 9092, schema-registry: 8081, confluent control center: 9021 The application compiles and runs without any errors or crashes, it just doesn't print out anything nor find any topic.

I'd really appreciate any help, since I've spent the last 5 hours trying to figure out what I have missed. :)

I have tried following multiple guides to find where any discrepancies between our code, but unfortuneatly their solution did not seem to work in my case.

An ordinary consumer is working perfectly fine with avro and is able to deserialize all of the stored messages:

    public static void main(String[] args) {
        Properties streamConfig = new Properties();
        streamConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "Test-normal-UserRegistrationService");
        streamConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        streamConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, Serdes.String().deserializer().getClass());
        streamConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);
        streamConfig.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "http://localhost:8081");
        streamConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        KafkaConsumer<String, Pending_Registrations> consumer = new KafkaConsumer<>(streamConfig);
        consumer.subscribe(java.util.Collections.singletonList("User.Pending-Registrations"));
        while (true) {
            consumer.poll(java.time.Duration.ofMillis(100)).forEach(record -> {
                System.out.println("key: " + record.key() + " value: " + record.value());
            });
        }
    }

No errors are thrown when I comment out the schema registry url configs, the SerdeConfig.


Solution

  • Thanks a lot to @OneCricketeer for his help on helping me solve what I had overlooked.

    It turns out that the version of Kafka Stream and Confluent Platform that I used was not compatible. After using the versions: confluentinc/cp-server:7.4.0 'kafka-streams', version: '3.4.1'

    The issue was resolved and the Kafka Stream could start streaming.