Search code examples
apache-kafkastreamksqldbktable

Kafka Stream - Filter by client_id


I'm using Kafka Stream to create a ktable only with data specific to client_id, which is not the topic key. I'm new to Kafka Streams that seems pretty straightforward but I got a bit confused into the multiples examples available in the community which are really good.

I'm trying to get the inputTopic data which has client_id=0123456. In KSQL below would be similar to command:

CREATE STREAM TOPIC1_CLIENT1 AS
SELECT * FROM TOPIC1
WHERE client_id= '0123456'
EMIT CHANGES;

Below I'm trying to reproduce same behavior. Can someone please tell what is i'm doing wrong on below ? It's not filtering as I expect.

        final KStream<String, String> stream = builder.stream(inputTopic, Consumed.with(stringSerde, stringSerde));
        final KTable<String, String> convertedTable = stream.filter((client_id,v) -> v.equals("0123456")).toTable(Materialized.as("stream-converted-to-table"));
        stream.to(streamsOutputTopic, Produced.with(stringSerde, stringSerde));
        convertedTable.toStream().to(tableOutputTopic, Produced.with(stringSerde, stringSerde));

Solution

  • v is the entire value of the message. To have named fields in KSQL, there's an associated schema on the stream, whether the data is JSON or Avro, for example, which means that the clientid is only part of the value