Search code examples
unit-testingapache-kafkaapache-kafka-streamskafka-topicspring-kafka-test

Kafka Streams Testing : java.util.NoSuchElementException: Uninitialized topic: "output_topic_name"


I've written a test class for kafka stream application as per https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html , the code for which is

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {
    
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String test_dummy = "dummy:1234";

@Before
public void setup() {
    Topology topology = new Topology();
      
    topology.addSource(kafkaEventSourceTopic, kafkaEventSourceTopic);

    topology.addProcessor(ProcessRouter.class.getSimpleName(), ProcessRouter::new, kafkaEventSourceTopic);

    topology.addProcessor(WorkforceVisit.class.getSimpleName(), WorkforceVisit::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DefaultProcessor.class.getSimpleName(), DefaultProcessor::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(CacheWorkforceShift.class.getSimpleName(), CacheWorkforceShift::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DigitalcareShiftassisstantTracking.class.getSimpleName(), DigitalcareShiftassisstantTracking::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(WorkforceLocationUpdate.class.getSimpleName(), WorkforceLocationUpdate::new
            , ProcessRouter.class.getSimpleName());

    topology.addSink(kafkaEventSinkTopic, kafkaEventSinkTopic
            , WorkforceVisit.class.getSimpleName(), DefaultProcessor.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    topology.addSink(kafkaCacheSinkTopic, kafkaCacheSinkTopic
            , WorkforceVisit.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);       
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, test_dummy);
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EventSerde.class.getName());
        
    testDriver = new TopologyTestDriver(topology, properties);

    //setup test topics
    inputTopic = testDriver.createInputTopic(kafkaEventSourceTopic, stringSerde.serializer(), eventSerde.serializer());
    outputTopic = testDriver.createOutputTopic(kafkaEventSinkTopic, stringSerde.deserializer(), eventSerde.deserializer());

}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    Object b =  outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);       
}

where I used EventSerde class to serialize and deserialize the value.

When I run this code it gives the error java.util.NoSuchElementException: Uninitialized topic: processed_events with the following stacktrace:

java.util.NoSuchElementException: Uninitialized topic: processed_events

at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:715)
at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
at org.apache.kafka.streams.TestOutputTopic.readValue(TestOutputTopic.java:80)
at com.uhx.platform.eventprocessor.config.KafkaStreamsConfigTest.outputEqualsTrue(KafkaStreamsConfigTest.java:111)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

As you can see i have initialized both input and output topics. I have also debugged the code and the error occurs when i read the value from output topic

outputTopic.readValue();

I don't understand what else i should do to initialize the outputTopic. Can anyone help me with this problem?

I am using apache kafka-streams-test-utils 2.4.0 and kafka-streams 2.4.0

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>2.4.0</version>
        <scope>test</scope>
    </dependency>

Solution

  • The exception is misleading. It just means that no output record was ever written into the output topic. Hence, the problem lies with the defined topology.

    To avoid the misleading exception, you can check if your output topic is not empty before trying to read from it. That way you'll get a clearer picture:

    @Test
    public void outputEqualsTrue()
    {
        inputTopic.pipeInput(key, value);
        assert(outputTopic.isEmpty(), false);
        Object b = outputTopic.readValue();
        System.out.println(b.toString());
        assertEquals(b,expected_value);
    }