Search code examples
dockerapache-kafkaapache-kafka-connectdebeziumkafkajs

NodeJS : KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members


I am trying to capture data from Kafka using MongoDB debezium connector but I am getting error when I try to read it with KafkaJS:

KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

I am using docker images to capture data.

Here are the steps, I am following :

  1. Start Zookeeper

    docker run -it --rm --name zookeeper -p 2181:2181 -p 2888:2888 -p 3888:3888 debezium/zookeeper:latest
    
  2. start kafka

    docker run -it --rm --name kafka -p 9092:9092 --link zookeeper:zookeeper debezium/kafka:latest
    
  3. I have MongoDB running with replicate mode already

  4. Start debezium Kafka connect

    docker run -it --rm --name connect -p 8083:8083 -e GROUP_ID=1 -e CONFIG_STORAGE_TOPIC=my_connect_configs -e OFFSET_STORAGE_TOPIC=my_connect_offsets -e STATUS_STORAGE_TOPIC=my_connect_statuses --link zookeeper:zookeeper --link kafka:kafka  debezium/connect:latest
    
  5. Then Post MongoDB connector configuration

    curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" localhost:8083/connectors/ -d '{ "name": "mongodb-connector", "config": { "connector.class": "io.debezium.connector.mongodb.MongoDbConnector", "mongodb.hosts": "rs0/abc.com:27017", "mongodb.name": "fullfillment", "collection.whitelist": "mongodev.test", "mongodb.user": "kafka", "mongodb.password": "kafka01" } }'
    
  6. With this If I run a watcher docker container, I am able to data in Json format in console

    docker run -it --name watchermongo --rm --link zookeeper:zookeeper --link kafka:kafka debezium/kafka:0.9 watch-topic -a -k fullfillment.mongodev.test
    

but I want to capture this data in application so that I can manipulate it, process it and push to ElasticSearch. For that I am using

https://github.com/tulios/kafkajs 

But When I run the consumer code, I am getting error.. Here is code example

//'use strict';






// clientId=connect-1, groupId=1

const { Kafka } = require('kafkajs')



const kafka = new Kafka({

  clientId: 'connect-1',

  brokers: ['localhost:9092', 'localhost:9093']

})


// Consuming

const consumer = kafka.consumer({ groupId: '1' })



var consumeMessage = async () => {



await consumer.connect()

await consumer.subscribe({ topic: 'fullfillment.mongodev.test' })



await consumer.run({

  eachMessage: async ({ topic, partition, message }) => {

    console.log({

      value: message.value.toString(),

    })

  },

})



}



consumeMessage();


KafkaJSProtocolError: The group member's supported protocols are incompatible with those of existing members

Solution

  • You should not be using the same groupId in both Connect and your KafkaJS consumer. If you do, they will be part of the same consumer group, which means that messages would only be consumed by one or the other, if it even worked at all.

    If you change the groupId of your KafkaJS consumer to something unique, it should work.

    Note that by default a new KafkaJS consumer group will start consuming from the latest offset, so it won't consume already produced messages. You can override this behavior with the fromBeginning flag in the consumer.subscribe call. See https://kafka.js.org/docs/consuming#from-beginning