Search code examples
node.jsapache-kafkakafka-consumer-apikafka-producer-api

Should Consumer process messages, then send messages back to Kafka


I'd like to process data from a Consumer with topic1, then send messages back into Kafka to topic2

Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka

My try:

 consumer.on('message', (message) => {
     let processedMsg = processMessage(message);

     payloads = [
        { topic: 'topic2', messages: processedMsg }
     ];

     producer.on('ready', function () {
         producer.send(payloads, function (err, data) {
             console.log(data);
         });
     });

     producer.on('error', function (err) {})
});

But, Producer cannot send processed messages into Kafka. The error what I got

MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit

I use node module Kafka-node


Solution

  • You need to switch the order of the producer ready listener and the consumer message listener.

    Otherwise, you're setting up the ready listener for each and every consumed message

    For example

     producer.on('ready', function () {
        consumer.on('message', (message) => {
          let processedMsg = processMessage(message);
    
          payloads = [
           { topic: 'topic2', messages: processedMsg }
          ];
    
           producer.send(payloads, function (err, data) {
             console.log(data);
           });
     });
    

    I would suggest looking at this library, though, if mostly processing and forwarding to new topics https://github.com/nodefluent/kafka-streams/