I need to consume the message from Kafka topic.
For example, my topic had one thousand records. I need to consume the message count basis. Count as parameter. i.e., if I give count 100 then consume 100 messages only, next loop consumes next 100 records need to consume.
how to do using kafkajs.
I answer my own question, it is helps me to do like batch process.
The below code sample consume the data 5sec, consumed data pushed into an array, then do the process after completion of process resume the consumer
var { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: 'consumer',
brokers: ['kafka:0']
})
const consumer = kafka.consumer({ groupId: 'test_topic_groups', allowAutoTopicCreation: true })
const run = async () => {
// Consuming
await consumer.connect()
await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
var isPaused = false;
var data = []
await consumer.run({
eachMessage: async ({ topic, partition, message }) => {
data.push(message)
if (!isPaused) {
isPaused = true;
setTimeout(async () => {
consumer.pause([{ "topic": "test-topic" }])
doyourProcessandResume(data)
console.log('Consumer paused...........')
}, 5000)
}
}
})
function doyourProcessandResume(data) {
// Do the process here , now i put timeout only then resume the consumer
setTimeout(() => {
isPaused = false
consumer.resume([{ "topic": 'test-topic' }]);
console.log('Consumer resumed..............')
}, 2000)
}
}
run().catch(console.error)