I want to limit the number of message sent to the User over a given time period. I have topic of message to be sent and I can get a count of message per key (UserId) over a time window, but if I try and join that count KTable to a KStream of the original messages I get inconsistent results. It seems to work the first time I run it, but then either the count per key doesn't reset when the window expires or the count is 1 for every message with the same key. Almost as if the suppress stops working.
I wrote a Quarkus app to implement this:
@Produces
public Topology buildTopology() {
log.info("Starting topology ....");
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, ContactMessage> messageKStream = streamsBuilder.stream(contactMessage,
Consumed.with(AppSerdes.String(), AppSerdes.ContactMessage()));
//.peek((key,value)-> System.out.println("Incoming record. key=" + key + " value=" + value.toString()));
KTable<Windowed<String>, Long> KT01 = messageKStream.groupByKey(Grouped.with(AppSerdes.String(),AppSerdes.ContactMessage()))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(windowDuration),Duration.ofSeconds(gracePeriod)))
.count()
.suppress(Suppressed.untilWindowCloses(unbounded()));
KTable<String, String> KT02 = KT01.toStream()
.peek((wKey, value) -> System.out.println("Outgoing message. key=" +wKey.key() + " value=" + value +
" Window start: " + Instant.ofEpochMilli(wKey.window().start()).atOffset(ZoneOffset.UTC) +
" Window end: " + Instant.ofEpochMilli(wKey.window().end()).atOffset(ZoneOffset.UTC)))
.map((wKey, value)-> KeyValue.pair(wKey.key(), value))
.peek((k,v)-> System.out.println("After map: k="+k+" value="+v))
.filter((key,value)-> value < 3 ) // && value !=0
.map((key, value) -> KeyValue.pair(key, value.toString()))
.peek((k,v)-> System.out.println("After filter. key: "+ k +" value: "+v))
.toTable();
ValueJoiner<ContactMessage, String, String> valueJoiner = (leftValue, rightValue) -> leftValue.toString() + rightValue;
messageKStream.join(
KT02,
valueJoiner,
Joined.with(Serdes.String(),AppSerdes.ContactMessage(), Serdes.String())
).peek((k,v)-> System.out.println("Join output- "+k+" "+v)).to(outboundMessage);
return streamsBuilder.build();
I'm guessing there is a better way to do this. Any input would be appreciated. thanks
Wrong approach. Used aggregate() to resolve this. join() won't work.