Search code examples
javaapache-kafka-streamstopology

KafkaStreams doesn't accept my serde in StreamsConfig


I am trying to merge data by id (Id is a String in the objects) in 2 topics and creating another topic to write the merged data. In my topology, I do not have any Properties configurations since I am not running it on remote yet. Just trying to test it in a unit test with Mockito and JUnit5.

@BeforeEach
void setUp() {
    testDriver = new TopologyTestDriver(myTopology.buildTopology());

    Serde<MyObject1> myObject1Serde = new JsonSerde<>(MyObject1.class);
    Serde<MyObject2> myObject2Serde = new JsonSerde<>(MyObject2.class);
    Serde<MyObject3> myObject3Serde = new JsonSerde<>(MyObject3.class);

    // It goes like creating input topics and serializing, creating output topic and deserializing it like that:
    myObject1Topic = testDriver.createInputTopic(
        MyTopology.FIRST_INPUT_TOPIC,
        Serdes.String().serializer(),
        myObject1Serde.serializer()
    );

    // ... same for the other 2 objects
}

When I run that, it gives me this error:

16:00:02.424 [main] WARN org.apache.kafka.streams.processor.internals.StateDirectory -- Using an OS temp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS. Resolved state.dir: [/var/folders/dn/f006xw_d4_7f9p5bbywpdxyh0000gp/T//kafka-streams]

org.apache.kafka.common.config.ConfigException: Please specify a value serde or set one through StreamsConfig#DEFAULT_VALUE_SERDE_CLASS_CONFIG
    at org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:1769)
    at org.apache.kafka.streams.processor.internals.AbstractProcessorContext.valueSerde(AbstractProcessorContext.java:100)
    at org.apache.kafka.streams.processor.internals.SerdeGetter.valueSerde(SerdeGetter.java:51)
    at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareSerde(WrappingNullableUtils.java:63)
    at org.apache.kafka.streams.kstream.internals.WrappingNullableUtils.prepareValueSerde(WrappingNullableUtils.java:94)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.prepareValueSerde(MeteredWindowStore.java:145)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.initStoreSerde(MeteredWindowStore.java:171)
    at org.apache.kafka.streams.state.internals.MeteredWindowStore.init(MeteredWindowStore.java:134)
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:219)
    at org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:99)
    at org.apache.kafka.streams.processor.internals.StreamTask.initializeIfNeeded(StreamTask.java:227)
    at org.apache.kafka.streams.TopologyTestDriver.setupTask(TopologyTestDriver.java:525)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:372)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:299)
    at org.apache.kafka.streams.TopologyTestDriver.<init>(TopologyTestDriver.java:275)
    at com.overspentblocker.overspentblocker.topology.OverspentTopologyTest.setUp(MyTopologyTest.java:46)

I assume that it is related to my topology and I am trying to understand Serdes, thanks for your help in advance.

I was trying to test my topology and I was expecting to merge data and assert them if it is working good.


Solution

  • I may assume, you just haven't specified the default value Serde in the StreamsConfig. While you have defined the Serdes for your specific classes MyObject1, MyObject2, and MyObject3, Kafka Streams also require a default Serde to be set. And even if you're not running anything on remote, you'd need to have local properties anyway, something like this might help:

    Properties props = new Properties();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-app-id");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());