Search code examples
node.jsapache-kafkalibrdkafka

How to read message one by one in node-rdkafka


I'm using node-rdkafka (https://github.com/Blizzard/node-rdkafka) to consume messages, the basic setup is working fine but it triggers the function every time I push something to the queue, irrespective of completion of previous method.

I want the next data unit to be triggered when the previous function is done.

here is my implementation

const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const consumer = new Kafka.KafkaConsumer({
    'group.id':'consumer',
    'metadata.broker.list': '*******',
    'sasl.mechanisms': 'PLAIN',
    'sasl.username': '********',
    'sasl.password': '********',
    'security.protocol': 'SASL_SSL',
    'enable.auto.commit':false
}, {});

// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
    if (err) {
        console.log(`Error connecting to Kafka broker: ${err}`);
        process.exit(-1);
    }

});
let is_pause = false;
consumer.on('ready', (arg)=>{
    console.log('consumer ready.' + JSON.stringify(arg));
    console.log('Consumer is ready');
    consumer.subscribe([topic]);
    setInterval(function() {
        console.log('consumer has consume on :'+timeMs());  
        consumer.consume();
      }, 1000);
});

consumer.on('data',async (data)=>{
    console.log('consumer is consuming data');
    if(!is_pause) {
        is_pause = true;
        if(data && typeof data !== 'undefined') {
            try {
                console.log('consumer received the data');
                consumer.pause([topic]);
                console.log('consumer has pause the consuming');
                await processMessage(data);
                console.log('consumer is resumed');
                consumer.resume([topic]);
                is_pause = false;
            } catch(error) {
                console.log('data consuming error');
                console.log(error);
            }
        } else {
            is_pause = false;
        }
    }
});



Solution

  • You are calling consume() (without any arguments) which returns messages as fast as possible.

    If you want to control the consumption pace, you can use the other method consume(size), that returns size Kafka records. For example consume(1) will return the next Kafka record.

    See the node-rdkafka Consumer docs.