Search code examples
apache-kafkakafka-consumer-apikafka-python

How to consume kafka queue every 30 minute in python


I'm having an issue regarding kafka consumer in python, here is my use case kafka producer will send data every second in real-time streaming. But kafka consumer needs to consume at every 30 minutes and collect batch of data. Help me to resolve.


Solution

  • If you don't want to process your data real-time, you may want to reconsider, if Kafka is the right solution for you. However, you can try this:

     Properties props = new Properties();
     props.put("bootstrap.servers", "localhost:9092");
     props.put("group.id", "your_consumer_group");
     props.put("enable.auto.commit", "true");
     props.put("auto.commit.interval.ms", "1000");
     props.put("session.timeout.ms", "30000");
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
     consumer.subscribe(Arrays.asList("foo", "bar"));
     while (true) {
         ConsumerRecords<String, String> records = consumer.poll(1000);
         for (ConsumerRecord<String, String> record : records) {
             System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
         }
    
         // After data is consumed, you make your thread sleep until next 30 min:
         Thread.sleep(30 * 60 * 1000);
     }
    

    If you'd like your real-time-batch-processing to be conducted at every 30th or 0th minute of every hour, you can use this sleep instead:

    Thread.sleep(System.currentTimeMillis() % (30*60*1000));
    

    It will make your consumer wake up at 00:00, 00:30, 01:00, 01:30, etc. Follow this link for details: https://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

    Again, you probably don't want to use kafka this way. It might be better do dump data to some storage (parquet files partitioned by date-time, for example) and do some batch processing on it every 30 minutes.