First time using Kafka, I learning Kafka using a microservice architecture and I am finding the next issue.
Every time I restart my service is processing all the messages in the topics. Is there a way I could only process those messages once, flag them as read or something?
This is my snippet in Pytho 3:
class EmailStreamConsumer:
def __init__(self, bootstrap_servers='localhost:9092'):
self.__bootstrap_servers = bootstrap_servers
self.__new_emails_consumer = KafkaConsumer('NewEmails', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest')
self.__sent_emails_consumer = KafkaConsumer('SentEmails', bootstrap_servers=bootstrap_servers,
auto_offset_reset='earliest')
def start(self):
for message in self.__new_emails_consumer:
value = message.value.decode('utf-8')
email = json.loads(value)
self.send_email(email['content'], email['to_email'], email['title'], email['from_email'])
print("%s:%d:%d: key=%s value=%s" % (
message.topic, message.partition, message.offset, message.key, message.value))
I wish that the service sends the emails only once. Even when the service is restarted.
I think your problem is that you don't have a GROUP ID
for your Kafka-Consumer
Just add:
String groupId = "kafka-new-emails";
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
Your application will start read from the latest email as your consumer group labeled where the last commit you read was. Also, if you have more than one consumer and one of them gets down, consumer group will help you in making a rebalance as to make the consumer that is online to read from the partition that was assigned to the consumer that is down.