I'd like to process data from a Consumer with topic1, then send messages back into Kafka to topic2
Kafka --> Consumer (processing messages) from topic1, then call a Producer to send processed message to topic2 --> Kafka
My try:
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.on('ready', function () {
producer.send(payloads, function (err, data) {
console.log(data);
});
});
producer.on('error', function (err) {})
});
But, Producer cannot send processed messages into Kafka. The error what I got
MaxListenersExceededWarning: Possible EventEmitter memory leak detected. 11 ready listeners added. Use emitter.setMaxListeners() to increase limit
I use node module Kafka-node
You need to switch the order of the producer ready listener and the consumer message listener.
Otherwise, you're setting up the ready listener for each and every consumed message
For example
producer.on('ready', function () {
consumer.on('message', (message) => {
let processedMsg = processMessage(message);
payloads = [
{ topic: 'topic2', messages: processedMsg }
];
producer.send(payloads, function (err, data) {
console.log(data);
});
});
I would suggest looking at this library, though, if mostly processing and forwarding to new topics https://github.com/nodefluent/kafka-streams/