Search code examples
apache-kafkaapache-kafka-streams

Purge KTable entries after sending values to output topic


I have a DB that stores pageviews per webpage. It does that by consuming a Kafka topic named pageviews, where each message have the page name as the key and the value as the number of views since the previous message.

This is a sample of the messages that are expected in pageviews topic:

pageviews topic:

key: "index", value: 349
key: "products", value: 67
key: "index", value: 15
key: "about", value: 11
...

The consumer of pageviews adds each time the above values to the PAGEVIEWS table.

Now, I am building the producer of pageviews topic. The data source of this application is the viewstream topic, where one message is created per view, like:

viewstream topic:

key: "index", value: <timestamp>
key: "index", value: <timestamp>
key: "product", value: <timestamp>
...

On the Kafka Stream application I have the following topology:

PageViewsStreamer:

builder.stream("viewstream")
    .groupByKey()
    .aggregate(...) // this builds a KTable with the sums of views per page
    .toStream()
    .to("pageviews")

I have 2 problems with this topology:

  1. The KTable that holds the aggregations does not get reset/purge after producing the output message to pageviews, so by simply adding the aggregated value to the DB table we get wrong results. How can I achieve each message sent to pageviews not to include views already sent in previous messages?

  2. I want pageviews messages to be sent once every 15 minutes (the default rate is about every 30 seconds).

I am trying to work with windowing for both, but I have failed so far.


Solution

  • You can achieve this behavior using a 15-minute tumbling windows and suppress the results until the windows time has passed (remember to add a grace time to bound the lateness of events which the the previous window will accept). View details here. I would do something like this:

    builder.stream("viewstream")
                    .groupByKey()
                    //window by a 15-minute time windows, accept event late in 30 second, you can set grace time smaller
                    .windowedBy(TimeWindows.of(Duration.ofMinutes(15)).grace(Duration.ofSeconds(30)))
                    .aggregate(...) // this builds a KTable with the sums of views per page
                    .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
                    .toStream()
                    //re-select key : from window to key
                    .selectKey((key, value) -> key.key())
                    .to("pageviews");