I'm using node-rdkafka (https://github.com/Blizzard/node-rdkafka) to consume messages, the basic setup is working fine but it triggers the function every time I push something to the queue, irrespective of completion of previous method.
I want the next data unit to be triggered when the previous function is done.
here is my implementation
const Kafka = require('node-rdkafka');
const topic = 'create_user_channel';
const consumer = new Kafka.KafkaConsumer({
'metadata.broker.list': '*******',
'sasl.mechanisms': 'PLAIN',
'sasl.username': '********',
'sasl.password': '********',
'security.protocol': 'SASL_SSL',
}, {});
// Connect the consumer.
consumer.connect({timeout: "1000ms"}, (err) => {
if (err) {
console.log(`Error connecting to Kafka broker: ${err}`);
let is_pause = false;
consumer.on('ready', (arg)=>{
console.log('consumer ready.' + JSON.stringify(arg));
console.log('Consumer is ready');
setInterval(function() {
console.log('consumer has consume on :'+timeMs());
}, 1000);
consumer.on('data',async (data)=>{
console.log('consumer is consuming data');
if(!is_pause) {
is_pause = true;
if(data && typeof data !== 'undefined') {
try {
console.log('consumer received the data');
console.log('consumer has pause the consuming');
await processMessage(data);
console.log('consumer is resumed');
is_pause = false;
} catch(error) {
console.log('data consuming error');
} else {
is_pause = false;
You are calling consume()
(without any arguments) which returns messages as fast as possible.
If you want to control the consumption pace, you can use the other method consume(size)
, that returns size
Kafka records. For example consume(1)
will return the next Kafka record.
See the node-rdkafka Consumer docs.