I've set up tweepy to fetch tweets and write to a topic TWEEPY_TOPIC and a stream to read from the topic.
-- Create topic for tweepy to write into
CREATE STREAM TWEEPY_STREAM (
id BIGINT,
lang VARCHAR,
tweet VARCHAR,
user STRUCT<id BIGINT,
screen_name VARCHAR>)
WITH (
KAFKA_TOPIC= 'TWEEPY_TOPIC',
VALUE_FORMAT = 'AVRO'
);
There's also another stream that reads from the above stream and writes it to another topic (which is pushed to elastic search using kafka-connect).
-- Create another topic with ML data.
-- GETSENTIMENT and GETFOURCLASS are custom ksql functions
CREATE STREAM ELASTIC_STREAM
WITH (
KAFKA_TOPIC = 'ELASTIC_TOPIC',
VALUE_FORMAT = 'AVRO',
PARTITIONS = 1, REPLICAS = 1
)
AS SELECT
id,
lang,
tweet,
user,
GETSENTIMENT(tweet) as sentiment,
GETFOURCLASS(tweet) as fourclass
FROM TWEEPY_STREAM;
The user defined functions GETSENTIMENT
and GETFOURCLASS
make a POST request to a python model server which returns the classification. These API response currently take close to 0.5-1 second.
My concern is that if the data in the first topic TWEEPY_TOPIC
is cleared after the default retention period (7 days), it will not get picked up by ELASTIC_STREAM
. Is there any way to set some kind of a flag to tell kafka to not delete data that hasn't been processed yet? I'm open to redesign suggestions as well.
Kafka does not have a clean up policy to only delete messages that have been consumed.
An alternative approach could be to use compacted topics. Compacted topics have a different clean up policy and the latest messages of all unique keys are kept.
Once a message has been consumed you could send a new message to the compacted topic with a null value. This marks that message as tombstoned and will be cleaned (deleted) by the Log Cleaner on the next compaction cycle.