Search code examples
pythoneventsapache-kafkaevent-driven

How to process messages in Kafka once, so a Service when is restarted doesnt process all messages


First time using Kafka, I learning Kafka using a microservice architecture and I am finding the next issue.

Every time I restart my service is processing all the messages in the topics. Is there a way I could only process those messages once, flag them as read or something?

This is my snippet in Pytho 3:

class EmailStreamConsumer:
def __init__(self, bootstrap_servers='localhost:9092'):
    self.__bootstrap_servers = bootstrap_servers
    self.__new_emails_consumer = KafkaConsumer('NewEmails', bootstrap_servers=bootstrap_servers,
                                               auto_offset_reset='earliest')
    self.__sent_emails_consumer = KafkaConsumer('SentEmails', bootstrap_servers=bootstrap_servers,
                                                auto_offset_reset='earliest')

def start(self):
    for message in self.__new_emails_consumer:
        value = message.value.decode('utf-8')
        email = json.loads(value)
        self.send_email(email['content'], email['to_email'], email['title'], email['from_email'])
        print("%s:%d:%d: key=%s value=%s" % (
            message.topic, message.partition, message.offset, message.key, message.value))

I wish that the service sends the emails only once. Even when the service is restarted.


Solution

  • I think your problem is that you don't have a GROUP ID for your Kafka-Consumer

    Just add:

    String groupId = "kafka-new-emails";
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    

    Your application will start read from the latest email as your consumer group labeled where the last commit you read was. Also, if you have more than one consumer and one of them gets down, consumer group will help you in making a rebalance as to make the consumer that is online to read from the partition that was assigned to the consumer that is down.