Search code examples
node.jsapache-kafkakafkajs

How to consume the message from kafka topic count basis or batch model using kafkajs npm


I need to consume the message from Kafka topic.

For example, my topic had one thousand records. I need to consume the message count basis. Count as parameter. i.e., if I give count 100 then consume 100 messages only, next loop consumes next 100 records need to consume.

how to do using kafkajs.


Solution

  • I answer my own question, it is helps me to do like batch process.

    The below code sample consume the data 5sec, consumed data pushed into an array, then do the process after completion of process resume the consumer

    var { Kafka } = require('kafkajs')
    const kafka = new Kafka({
        clientId: 'consumer',
        brokers: ['kafka:0']
    })
    const consumer = kafka.consumer({ groupId: 'test_topic_groups', allowAutoTopicCreation: true })
    
    const run = async () => {
        // Consuming
        await consumer.connect()
        await consumer.subscribe({ topic: 'test-topic', fromBeginning: true })
        var isPaused = false;
        var data = []
        await consumer.run({
            eachMessage: async ({ topic, partition, message }) => {
                data.push(message)
                if (!isPaused) {
                    isPaused = true;
                    setTimeout(async () => {
                        consumer.pause([{ "topic": "test-topic" }])
                        doyourProcessandResume(data)
                        console.log('Consumer paused...........')
                    }, 5000)
                }
            }
        })
        function doyourProcessandResume(data) {
            // Do the process here , now i put timeout only then resume the consumer
            setTimeout(() => {
                isPaused = false
                consumer.resume([{ "topic": 'test-topic' }]);
                console.log('Consumer resumed..............')
            }, 2000)
        }
    }
    run().catch(console.error)