I have a DB that stores pageviews per webpage. It does that by consuming a Kafka topic named pageviews
, where each message have the page name as the key
and the value
as the number of views since the previous message.
This is a sample of the messages that are expected in pageviews
topic:
pageviews topic:
key: "index", value: 349
key: "products", value: 67
key: "index", value: 15
key: "about", value: 11
...
The consumer of pageviews
adds each time the above values
to the PAGEVIEWS table.
Now, I am building the producer of pageviews
topic. The data source of this application is the viewstream
topic, where one message is created per view, like:
viewstream topic:
key: "index", value: <timestamp>
key: "index", value: <timestamp>
key: "product", value: <timestamp>
...
On the Kafka Stream application I have the following topology:
PageViewsStreamer:
builder.stream("viewstream")
.groupByKey()
.aggregate(...) // this builds a KTable with the sums of views per page
.toStream()
.to("pageviews")
I have 2 problems with this topology:
The KTable that holds the aggregations does not get reset/purge after producing the output message to pageviews
, so by simply adding the aggregated value to the DB table we get wrong results. How can I achieve each message sent to pageviews
not to include views already sent in previous messages?
I want pageviews
messages to be sent once every 15 minutes (the default rate is about every 30 seconds).
I am trying to work with windowing for both, but I have failed so far.
You can achieve this behavior using a 15-minute tumbling windows and suppress the results until the windows time has passed (remember to add a grace time to bound the lateness of events which the the previous window will accept). View details here. I would do something like this:
builder.stream("viewstream")
.groupByKey()
//window by a 15-minute time windows, accept event late in 30 second, you can set grace time smaller
.windowedBy(TimeWindows.of(Duration.ofMinutes(15)).grace(Duration.ofSeconds(30)))
.aggregate(...) // this builds a KTable with the sums of views per page
.suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()))
.toStream()
//re-select key : from window to key
.selectKey((key, value) -> key.key())
.to("pageviews");