Search code examples
javaunit-testingapache-kafkaapache-kafka-streams

Kafka GroupTable tests generating extra messages when using ProcessorTopologyTestDriver


I've written a stream that takes in messages and sends out a table of the keys that have appeared. If something appears, it will show a count of 1. This is a simplified version of my production code in order to demonstrate the bug. In a live run, a message is sent out for each message received.

However, when I run it in a unit test using ProcessorTopologyTestDriver, I get a different behavior. If a key that has already been seen before is received, I get an extra message.

If I send messages with keys "key1", then "key2", then "key1", I get the following output.

key1 - 1
key2 - 1
key1 - 0
key1 - 1

For some reason, it decrements the value before adding it back in. This only happens when using ProcessorTopologyTestDriver. Is this expected? Is there a work around? Or is this a bug?

Here's my topology:

final StreamsBuilder builder = new StreamsBuilder();
    KGroupedTable<String, String> groupedTable
            = builder.table(applicationConfig.sourceTopic(), Consumed.with(Serdes.String(), Serdes.String()))
            .groupBy((key, value) -> KeyValue.pair(key, value), Serialized.with(Serdes.String(), Serdes.String()));

    KTable<String, Long> countTable = groupedTable.count();

    KStream<String, Long> countTableAsStream = countTable.toStream();
    countTableAsStream.to(applicationConfig.outputTopic(), Produced.with(Serdes.String(), Serdes.Long()));

Here's my unit test code:

TopologyWithGroupedTable top = new TopologyWithGroupedTable(appConfig, map);
    Topology topology = top.get();
    ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, topology);
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key2", "theval", Serdes.String().serializer(), Serdes.String().serializer());
    driver.process(inputTopic, "key1", "theval", Serdes.String().serializer(), Serdes.String().serializer());

    ProducerRecord<String, Long> outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key2", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value());
    outputRecord = driver.readOutput(outputTopic, keyDeserializer, valueDeserializer);
    assertEquals("key1", outputRecord.key());
    assertEquals(Long.valueOf(1L), outputRecord.value()); //this fails, I get 0.  If I pull another message, it shows key1 with a count of 1

Here's a repo of the full code:

https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/

Stream topology: https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/main/java/com/nick/kstreams/TopologyWithGroupedTable.java

Test code: https://bitbucket.org/nsinha/testtopologywithgroupedtable/src/master/src/test/java/com/nick/kstreams/TopologyWithGroupedTableTests.java


Solution

  • Update

    In Kafka 3.5, this behavior was improved via KIP-904. If the adder and subtractor are called for the same key from single upstream update (what is not the general case, for which substractor and adder are called for different keys for a single upstream update), only a single result record is emitted now.

    Original Answer

    It's not a bug, but behavior by design (c.f. explanation below).

    The difference in behavior is due to KTable state store caching (cf. https://docs.confluent.io/current/streams/developer-guide/memory-mgmt.html). When you run the unit test, the cache is flushed after each record, while in your production run, this is not the case. If you disable caching in your production run, I assume that it behaves the same as in your unit test.

    Side remark: ProcessorTopologyTestDriver is an internal class and not part of public API. Thus, there is no compatibility guarantee. You should use the official unit-test packages instead: https://docs.confluent.io/current/streams/developer-guide/test-streams.html

    Why do you see two records:

    In your code, you are using a KTable#groupBy() and in your specific use case, you don't change the key. However, in general, the key might be changed (depending on the value of the input KTable. Thus, if the input KTable is changed, the downstream aggregation needs to remove/subtract the old key-value pair from the aggregation result, and add the new key-value pair to the aggregation result—in general, the key of the old and new pair are different and thus, it's required to generate two records because the subtraction and addition could happen on different instances as different keys might be hashed differently. Does this make sense?

    Thus, for each update of the input KTable, two updates two the result KTable on usually two different key-value pairs need to be computed. For you specific case, in which the key does not change, Kafka Stream does the same thing (there is no check/optimization for this case to "merge" both operations into one if the key is actually the same).