Search code examples
apache-kafkaksqldb

Retain data in topic that hasn't been processed yet


I've set up tweepy to fetch tweets and write to a topic TWEEPY_TOPIC and a stream to read from the topic.

-- Create topic for tweepy to write into
CREATE STREAM TWEEPY_STREAM (
    id BIGINT,
    lang VARCHAR,
    tweet VARCHAR,
    user STRUCT<id BIGINT,
                screen_name VARCHAR>)
    WITH (
        KAFKA_TOPIC= 'TWEEPY_TOPIC',
        VALUE_FORMAT = 'AVRO'
        );

There's also another stream that reads from the above stream and writes it to another topic (which is pushed to elastic search using kafka-connect).

-- Create another topic with ML data.
-- GETSENTIMENT and GETFOURCLASS are custom ksql functions
CREATE STREAM ELASTIC_STREAM
WITH (
    KAFKA_TOPIC = 'ELASTIC_TOPIC',
    VALUE_FORMAT = 'AVRO',
    PARTITIONS = 1, REPLICAS = 1
)
AS SELECT 
    id,
    lang,
    tweet,
    user,
    GETSENTIMENT(tweet) as sentiment,
    GETFOURCLASS(tweet) as fourclass
FROM TWEEPY_STREAM;

The user defined functions GETSENTIMENT and GETFOURCLASS make a POST request to a python model server which returns the classification. These API response currently take close to 0.5-1 second.

My concern is that if the data in the first topic TWEEPY_TOPIC is cleared after the default retention period (7 days), it will not get picked up by ELASTIC_STREAM. Is there any way to set some kind of a flag to tell kafka to not delete data that hasn't been processed yet? I'm open to redesign suggestions as well.


Solution

  • Kafka does not have a clean up policy to only delete messages that have been consumed.

    An alternative approach could be to use compacted topics. Compacted topics have a different clean up policy and the latest messages of all unique keys are kept.

    Once a message has been consumed you could send a new message to the compacted topic with a null value. This marks that message as tombstoned and will be cleaned (deleted) by the Log Cleaner on the next compaction cycle.