Search code examples
node.jsapache-kafkakafka-consumer-apikafkajs

Kafkajs run multiple consumer in single file


I am using kubernetes microserice in which I have create single Kafka micro service to consume all topic and perform its activities. Code sample below:

const { Kafka, logLevel } = require("kafkajs");
const kafka = new Kafka({
   logLevel: logLevel.INFO,
   clientId: "kafka9845",
   brokers:["90.45.78.123"],
   connectionTimeout:30000,
   sessionTimeout:30000,
   requestTimeout:30000,
   heartbeatInterval:10000,
   retry: {
      initialRetryTime:5000,
      retries:200
   }
});

const topicGroup1 = ["topic1","topic2"];
const topicGroup2 = ["topic3", "topic4", "topic5"];

const consumer1 = kafka.consumer({groupId: 'consumer1', fromBegining:true});
const funConsumer1 = async ( consumer1 ) => {
   await topicGroup1.forEach((topic) => {
      consumer1.subscribe({topic:topic });
   });
   await funConsumer1.run({
    autoCommit: false,
    eachMessage: async (task) => {
        console.log(task);
        await funConsumer1.commitOffsets([{topic: task.topic, offset : (Number(task.message.offset) +1).toString()}])
    }
  });
 }

  const consumer2 = kafka.consumer({groupId: 'consumer2', fromBegining:true});
  const funConsumer2 = async ( consumer2 ) => {
    await topicGroup2.forEach((topic) => {
      consumer2.subscribe({topic:topic });
    });
    await funConsumer2.run({
      autoCommit: false,
      eachMessage: async (task) => {
        console.log(task);
        await funConsumer2.commitOffsets([{topic: task.topic, offset : (Number(task.message.offset) +1).toString()}])
     }
     });
   }

  consumer1.on('consumer.crash', async (payload) => {
    try {
      consumer1.disconnect();
    } catch(error) {

   } finally {
    setTimeout( async () => {
        await consumer1.connect();
        funConsumer1(consumer1).catch(console.error);
    }, 5000);
   }
 });

  consumer2.on('consumer.crash', async (payload) => {
   try {
      consumer2.disconnect();
   } catch(error) {

   } finally {
     setTimeout( async () => {
        await consumer2.connect();
        funConsumer2(consumer2).catch(console.error);
    }, 5000);
   }
 });


 const funConnect = () => {
   consumer1.connect();
   consumer2.connect();
   funConsumer1(consumer1).catch(console.error);
   funConsumer2(consumer2).catch(console.error);
 }

funConnect();
process.on('SIGINT', function () {
  consumer1.disconnect();
  consumer2.disconnect();
});

I am using "kafkajs":"^2.0.0" and "kafka-node":"^5.0.0"

Everyday we are producing million of message inside different topic

Problem: I have analysis consumer processing in queue way.

Requirement: Both consumer should consume message in parallel.


Solution

  • If you're using Kubernetes, then just make a Deployment for each consumer process. The code provided doesn't have the consumers interacting with one another, so they don't have to be in the same process.

    Otherwise, there's no reason that one group/consumer cannot subscribe to multiple topics. You should not use foreach to subscribe, however. Pass the whole array to subscribe function

    You can check which topic is being consumed from one consumer that is subscribed to multiple topics using a if/switch case within the eachMessage block