Search code examples
scalajoinapache-kafkaapache-kafka-streams

Testing gracePeriod behavior in KafkaStreams KStream-KStream Join with TopologyTestDriver


I'm testing the behavior of a KStream-KStream join with a gracePeriod. For simplicity, I will simplify the code that will be posted here, but the core concept are the same, only the business logic is different.

I'm using Scala and Kafka(Streams) 2.4.1

I have an issue on understanding the behavior of the test, but it can be my fault of the understanding of gracePeriod.

I have an Inner Join written like this:

class SimpleJoin {

  private implicit val consumed: Consumed[String, String] =
    Consumed
      .`with`(Serdes.String(), Serdes.String())
      .withTimestampExtractor((record: ConsumerRecord[AnyRef, AnyRef], _: Long) => record.timestamp())
  private implicit val streamJoined: StreamJoined[String, String, String] =
    StreamJoined.`with`(Serdes.String(), Serdes.String(), Serdes.String())
  private implicit val produced: Produced[String, String] =
    Produced.`with`(Serdes.String(), Serdes.String())

  val topology: Topology = {
    val builder = new StreamsBuilder

    val left: KStream[String, String]  = builder.stream("left")
    val right: KStream[String, String] = builder.stream("right")

    left
      .join(right)(
        {
          case (leftV, rightV) => s"Left: $leftV - Right: $rightV"
        },
        JoinWindows.of(Duration.ofSeconds(4)).grace(Duration.ofSeconds(10))
      )
      .to("out")

    builder.build()
  }

So size 4 seconds and grace 10 seconds.

I setup the test like this

      val left  = "left"
      val right = "right"
      val out   = "out"

      val props = new Properties()
      props.put(StreamsConfig.APPLICATION_ID_CONFIG, s"mocked-${UUID.randomUUID}")
      props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

      val startTime = Instant.ofEpochMilli(0L)

      val topologyTestDriver =
        new TopologyTestDriver(new SimpleJoin().topology, props, startTime)

      val leftInputTopic =
        topologyTestDriver.createInputTopic(
          left,
          StringSerde.serializer(),
          StringSerde.serializer(),
          startTime,
          Duration.ofSeconds(0L)
        )

      val rightInputTopic =
        topologyTestDriver.createInputTopic(
          right,
          StringSerde.serializer(),
          StringSerde.serializer(),
          startTime,
          Duration.ofSeconds(0L)
        )

      val outputTopic =
        topologyTestDriver.createOutputTopic(out, StringSerde.deserializer(), StringSerde.deserializer())

      leftInputTopic.pipeInput(
        "KEY",
        "LEFT_0",
        startTime.plusSeconds(0L)
      )

      rightInputTopic.pipeInput(
        "KEY",
        "RIGHT_3",
        startTime.plusSeconds(3L)
      )

      //this should join

      outputTopic.getQueueSize shouldBe 1

      leftInputTopic.pipeInput(
        "KEY",
        "LEFT_20",
        startTime.plusSeconds(20L)
      )

      // this should not join with anything and stream time of left should go to 20L

      outputTopic.getQueueSize shouldBe 1

      rightInputTopic.pipeInput(
        "KEY",
        "RIGHT_4",
        startTime.plusSeconds(4L)
      )

      // this should not join because it's too late (size 4 + grace 10 but left stream time is 20 so the first window should be closed)

      // this is 2 and not 1 because even though it shouldn't, RIGHT_4 is joined with the very first left record

      outputTopic.getQueueSize shouldBe 1

      topologyTestDriver.close()

You can read in the comments but basically I expect that the last right-record will not join with the very first left record because it arrives too late, but for some reason the join is there, even though the window should be closed (size + grace). Just for add another piece, if the last record is sent at time 5 and not 4 the join isn't done (correctly, because we are outside the JoinWindow created by the first record)

Am I doing something wrong? Maybe my understanding of grace period is wrong?

Thank you


Solution

  • After a lot of digging in the KafkaStreams internals I was able to understand why it doesn't work.

    Basically internally there are some mechanisms in respect of calculating window size, grace period size and when the data are effectively deleted from the system.

    Briefly and simplifying a little bit:

    Given a

    JoinWindows.of(Duration.ofSeconds(4)).grace(Duration.ofSeconds(10))

    We have:

    • windows.size() = 4 + 4 = 8 because JoinWindows.of(...) will create a window that will join with data before and after the opening of the window, so the size in this case is doubled.
    • retentionPeriod = windows.size() + grace = 8 + 10 = 18 and this is the retention of the store that will handle data to be joined.
    • The time that drives the join is not the wall clock time but is the stream time: when a record with timestamp T arrives, the stream time will be set as T. Of course this will go only in the future, late event that arrives after T but with a timestamp less than T will not modify the stream time.
    • segmentSize = max(retentionPeriod / 2, 60 seconds) = 60 seconds this is the segment size of Kafka messages

    segmentSize in particular is the culprit of failing tests.

    In this case segmentSize is 60 seconds, and by design a segment is used to store data and is deleted only when the latest possible entry in the segment has expired (the store of the window in this case). So, given that a segment is 60 seconds, the segment is [0, 60) and thus that segment is deleted if and only if the latest entry (that is at 59) is expired, so when the stream time is greater than 59 + 18 = 77

    And we can prove this with this test:

    leftInputTopic.pipeInput(
      "KEY",
      "LEFT_0",
      startTime.plusSeconds(0L)
    )
    
    leftInputTopic.pipeInput(
      "KEY",
      "LEFT_77",
      startTime.plusSeconds(77L) // <-------
    )
    
    rightInputTopic.pipeInput(
      "KEY",
      "RIGHT_4",
      startTime.plusSeconds(4L)
    )
    
    • LEFT_0 will arrive and the stream time will be 0
    • Then we have LEFT_77 and so the stream time will be 77

    In this case, the RIGHT_4 will join with LEFT_0 even though it shouldn't because in theory the grace period is expired while in reality it's not because the segment is not deleted, since the latest entry of the first segment (59) has not expired

    But, if we go like this:

    leftInputTopic.pipeInput(
      "KEY",
      "LEFT_0",
      startTime.plusSeconds(0L)
    )
    
    leftInputTopic.pipeInput(
      "KEY",
      "LEFT_77",
      startTime.plusSeconds(78L) // <-------
    )
    
    rightInputTopic.pipeInput(
      "KEY",
      "RIGHT_4",
      startTime.plusSeconds(4L)
    )
    

    In this case it doesn't join correctly because the segment is deleted given that the stream time is 78.

    This is why the test with the grace period are difficult to be done, because grace period sets a lower bound that will guarantee how long the data will be stored at least, but data can be stored also for a much longer time and that's why in order to create a test with grace period we have also to consider the internals of KafkaStreams and how the store and segment works.