Search code examples
node.jstypescriptapache-kafkakafkajs

How can I make my KafkaJS consumer wait indefinitely for a broker to come online?


I have a KafkaJS consumer configured as follows:

// Create the kafka client
const kafka = new Kafka({
  clientId,
  brokers,
});

// Create the consumer
const consumer = this.kafka.consumer({
  groupId,
  heartbeatInterval: 3000,
  sessionTimeout: 30000,
});

// Connect the consumer
consumer.connect().then(async (res) => {
  await this.consumer.run({
    eachMessage: async ({ topic, partition, message, heartbeat, pause }) => {
      this.subscriptionRegistrations[topic](topic, partition, message);
    },
  }).catch((err) => {
    console.log('Error running consumer!', err);

  });
}).catch(async (err) => {
  console.log('Error connecting consumer!', err);
})

Currently, when starting the application I get several connection errors like these:

{"level":"ERROR","timestamp":"2023-01-27T23:29:58.214Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection error: connect ECONNREFUSED 127.0.0.1:9092","retryCount":0,"retryTime":246}
{"level":"ERROR","timestamp":"2023-01-27T23:29:58.463Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 127.0.0.1:9092","broker":"localhost:9092","clientId":"CLIENT_ID_TEST","stack":"Error: connect ECONNREFUSED 127.0.0.1:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (node:net:1157:16)"}

The above errors are expected as I am not running a broker. Then followed by:

KafkaJSNonRetriableError
  Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED 127.0.0.1:9092
    at Socket.onError (/path/to/project/node_modules/kafkajs/src/network/connection.js:210:23)
    at Socket.emit (node:events:526:28)
    at emitErrorNT (node:internal/streams/destroy:157:8)
    at emitErrorCloseNT (node:internal/streams/destroy:122:3)
    at processTicksAndRejections (node:internal/process/task_queues:83:21)
[ERROR] 23:29:59 KafkaJSNumberOfRetriesExceeded: Connection error: connect ECONNREFUSED 127.0.0.1:9092

At which point the application hangs. It doesn't crash. I also get the error I logged in the catch block of consumer.connect():

Error connecting consumer KafkaJSNonRetriableError
  Caused by: KafkaJSConnectionError: Connection error: connect ECONNREFUSED 127.0.0.1:9092
    at Socket.onError (/path/to/project/node_modules/kafkajs/src/network/connection.js:210:23)
    ... 3 lines matching cause stack trace ...
    at processTicksAndRejections (node:internal/process/task_queues:83:21) {
  name: 'KafkaJSNumberOfRetriesExceeded',
  retriable: false,
  helpUrl: undefined,
  retryCount: 5,
  retryTime: 3636,
  [cause]: KafkaJSConnectionError: Connection error: connect ECONNREFUSED 127.0.0.1:9092
      at Socket.onError (/path/to/project/node_modules/kafkajs/src/network/connection.js:210:23)
      at Socket.emit (node:events:526:28)
      at emitErrorNT (node:internal/streams/destroy:157:8)
      at emitErrorCloseNT (node:internal/streams/destroy:122:3)
      at processTicksAndRejections (node:internal/process/task_queues:83:21) {
    retriable: true,
    helpUrl: undefined,
    broker: 'localhost:9092',
    code: 'ECONNREFUSED',
    [cause]: undefined
  }
}

I would like my application to be able to start up before the Kafka broker it is configured to connect to. In this scenario, KafkaJS would continue to retry the connection indefinitely until a broker is available. Ideally this would also work as a reconnection strategy if the broker goes down KafkaJS would continue to try to reconnect indefinitely until the broker come back online. From what I have read in the docs, this is how it is supposed to behave, but it's not doing that for me. Perhaps I have set up my client incorrectly. Thank you for the input!


Solution

  • I'd recommend using AdminClient instead to "ping" the cluster, first.

    But, you can retry the connection after catching the KafkaJSNumberOfRetriesExceeded error.

    const consumer = ...
    while (true) {
      try {
        await consumer.connect();
        break;
      } catch (err) {
        console.log('Error connecting consumer!', err);
        if (err instanceof KafkaJSNonRetriableError && err.name === 'KafkaJSNumberOfRetriesExceeded') {
          console.log('retrying connection...');
          continue; 
        }
        console.error('unknown error ' + err);
        break;
      }
    } 
    await consumer.run() ... 
    

    Ideally this would also work as a reconnection strategy if the broker goes down KafkaJS would continue to try to reconnect indefinitely until the broker come back online

    Or... You run more than one broker, and therefore have a highly available Kafka cluster that clients can connect to.


    If you ran your code in Docker/Kubernetes with a restart policy, you could just let the process crash, and a new container would restart on its own. You could implement the same with supervisord.