Search code examples
c#.netapache-kafkaksqldb

Is there a way to consume a Kafka Ksql Push query from .NET


I'm processing a large volume of Kafka messages in .NET with a Kafka consumer at the moment.

Step 1 in my processing is to Parse the JSON and to discard many of the messages based on the value of a specific field in the JSON.

I'd like to not process (and specifically, not download) those unwanted messages in the first place.

It looks like a kSql query - written as a push query - can effectively filter out the messages I need to process.

How can I consume these via .NET though? I saw some documents mentioning a REST API, but I doubt that this is a good idea, I need to process in excess of 100 000 records per minute at peak times of day.( If I can selectively download and process messages, I will only be processing about one third of the current volume.)

Unfortunately I don't have control over the publisher, so I can't change what/how the messages are published.


Solution

  • Yes, you can use ksqlDB to do this

    -- Declare a stream on the source topic
    -- Because it's JSON you'll need to specify the schema
    CREATE STREAM my_source (COL1 VARCHAR, COL2 INT) 
      WITH (KAFKA_TOPIC='my_source_topic', VALUE_FORMAT='JSON');
    
    -- Apply the filter to the stream, with the results written
    -- to a new stream (backed by a new topic)
    CREATE STREAM target WITH (KAFKA_TOPIC='my_target_topic') AS
      SELECT * FROM my_source WHERE COL1='FOO';
    

    Then using the REST API from within your application run a push query which will consume just the filtered messages:

    SELECT * FROM target EMIT CHANGES;
    

    Aside from ksqlDB, you might also want to have a look at this recently released project from the community: https://github.com/LGouellec/kafka-streams-dotnet