I have to set zookeeper IP address in ConsumerGroup instead of Kafka-Host IP address. Because I have set replication factor as 3 and 3 brokers were created. So, if one host fails then the another one can take over.
When I tried to put zookeeper IP address instead of Kafka-Host IP address in ConsumerGroup, it doesn't receive any messages which were sent from Producer API.
var kafka = require('kafka-node')
var ConsumerGroup = kafka.ConsumerGroup
function createConsumerGroup () {
var options = {
kafkaHost: '127.0.0.1:9092',
batch: undefined,
ssl: true,
groupId: 'demoExample',
protocol: ['roundrobin'],
encoding: 'utf8',
fromOffset: 'latest',
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: 'earliest',
onRebalance: (isAlreadyMember, callback) => { callback(); }
}
var consumerGroup = new ConsumerGroup(Object.assign({ id: 'demo-' + process.pid }, options), 'example')
consumerGroup.on('message', function (message) {
message.value = JSON.parse(message.value)
console.log('Message Received')
})
}
I want that, if I pass zookeeper IP address in ConsumerGroup rather then Kafka-Host IP address, it should receive messages which were sent from Producer API on "example" topic. And if one broker fails then it should receive messages from another broker. As replication factor is set to 3 and 3 brokers were created.
Okay, the problem was with the consumerGroup options object.
We have to pass zookeeper IP address in "host" key in options object instead of "kafkaHost" key. This solves the problem and receives all data sent by Producer API. And even automatically switch over to another replica set if one replica set fails.
var options = {
kafkaHost: '127.0.0.1:9092',
batch: undefined,
ssl: true,
groupId: 'demoExample',
protocol: ['roundrobin'],
encoding: 'utf8',
fromOffset: 'latest',
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: 'earliest',
onRebalance: (isAlreadyMember, callback) => { callback(); }
}
The following block of code fix it.
var options = {
host: '127.0.0.1:2181', // change in key & value
batch: undefined,
ssl: true,
groupId: 'demoExample',
protocol: ['roundrobin'],
encoding: 'utf8',
fromOffset: 'latest',
commitOffsetsOnFirstJoin: true,
outOfRangeOffset: 'earliest',
onRebalance: (isAlreadyMember, callback) => { callback(); }
}
Thanks.