Search code examples
javaapache-kafkaapache-kafka-streamskafka-join

How to use Kafka Streams to only process messages sent less than 3 times per key (UserId) over a given time window


I want to limit the number of message sent to the User over a given time period. I have topic of message to be sent and I can get a count of message per key (UserId) over a time window, but if I try and join that count KTable to a KStream of the original messages I get inconsistent results. It seems to work the first time I run it, but then either the count per key doesn't reset when the window expires or the count is 1 for every message with the same key. Almost as if the suppress stops working.

I wrote a Quarkus app to implement this:

   @Produces
    public Topology buildTopology() {
        log.info("Starting topology ....");

        StreamsBuilder streamsBuilder = new StreamsBuilder();

        KStream<String, ContactMessage> messageKStream = streamsBuilder.stream(contactMessage,
                Consumed.with(AppSerdes.String(), AppSerdes.ContactMessage()));
                        //.peek((key,value)-> System.out.println("Incoming record. key=" + key + " value=" + value.toString()));

        KTable<Windowed<String>, Long> KT01 = messageKStream.groupByKey(Grouped.with(AppSerdes.String(),AppSerdes.ContactMessage()))
                .windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(windowDuration),Duration.ofSeconds(gracePeriod)))
                .count()
                .suppress(Suppressed.untilWindowCloses(unbounded()));

        KTable<String, String> KT02 = KT01.toStream()
                .peek((wKey, value) -> System.out.println("Outgoing message. key=" +wKey.key() + " value=" + value +
                        " Window start: " + Instant.ofEpochMilli(wKey.window().start()).atOffset(ZoneOffset.UTC) +
                        " Window end: " + Instant.ofEpochMilli(wKey.window().end()).atOffset(ZoneOffset.UTC)))
                .map((wKey, value)-> KeyValue.pair(wKey.key(), value))
                .peek((k,v)-> System.out.println("After map: k="+k+" value="+v))
                .filter((key,value)-> value < 3 )    // && value !=0
                .map((key, value) -> KeyValue.pair(key, value.toString()))
                .peek((k,v)-> System.out.println("After filter. key: "+ k +" value: "+v))
                .toTable();

        ValueJoiner<ContactMessage, String, String> valueJoiner = (leftValue, rightValue) -> leftValue.toString() + rightValue;
        messageKStream.join(
                KT02,
                valueJoiner,
                Joined.with(Serdes.String(),AppSerdes.ContactMessage(), Serdes.String())
        ).peek((k,v)-> System.out.println("Join output- "+k+" "+v)).to(outboundMessage);
return streamsBuilder.build();

I'm guessing there is a better way to do this. Any input would be appreciated. thanks


Solution

  • Wrong approach. Used aggregate() to resolve this. join() won't work.