I am trying to test a stream-stream join with TopologyTestDriver
. My goal is to confirm, without running external services, that my topology performs the following left join correctly.
case (billValue, null) => billValue
case (billValue, paymentValue) => (billValue.toInt - paymentValue.toInt).toString
In other words, if we see a bill
and a payment
within 100ms, the payment should be subtracted from the bill. If we do not see a payment, the debt is simply the bill.
Here is the test code.
val simpleLeftJoinTopology = new SimpleLeftJoinTopology
val driver = new TopologyTestDriver(simpleLeftJoinTopology.topology)
val serde = Serdes.stringSerde
val bills = driver.createInputTopic("bills", serde.serializer, serde.serializer)
val payments = driver.createInputTopic("payments", serde.serializer, serde.serializer)
val debt = driver.createOutputTopic("debt", serde.deserializer, serde.deserializer)
bills.pipeInput("fred", "100")
bills.pipeInput("george", "20")
payments.pipeInput("fred", "95")
// When in doubt, sleep twice
val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
// This record is present
new KeyValue[String, String]("fred", "5"),
// This record is missing
new KeyValue[String, String]("george", "20")
Full code available at https://github.com/Oduig/kstreams-left-join-example
Is it possible to test a left join this way?
I found out what the problem was. Kafka Streams works with "stream time", which is based on the event time of a record. The event time can come from multiple sources.
takes a fixed timestamp, which can be optionally incremented after every record by passing autoAdvance
.My code was working at runtime, because Kafka used wall clock time by default but the test topics do not. This means that the so-called "stream time" was frozen in my test, and the window for "george"
was never closed.
We can set the time explicitly (or use autoAdvance
) to trigger the correct behavior.
val t0: Instant = Instant.now()
bills.pipeInput(new TestRecord("fred", "100", t0))
bills.pipeInput(new TestRecord("george", "20", t0))
payments.pipeInput(new TestRecord("fred", "95", t0))
// Sending an extra record with a later event time
payments.pipeInput(new TestRecord("percy", "0", t0.plusMillis(101)))
val keyValues = debt.readKeyValuesToList()
keyValues should contain theSameElementsAs Seq(
new KeyValue[String, String]("fred", "5"),
new KeyValue[String, String]("george", "20")
The test now passes. Note that there is no way to advance stream time, without sending a new record to the topic. This means if the data is very sparse, it can take a long time (on the wall clock) for records to be emitted.