Search code examples
node.jsapache-kafkastrimzikafkajs

Kafka consumer subscribers -- member assignments being lost (nodejs)


Synopsis

We have a subscriber that responds to events on 5 topics with a single consumer-group, each with a single partition. When firing events into these topics, we see our code handling the first two events, then nothing. When we check, on startup, the consumer group has 5 member assignments, but after handling the 2nd event all of the member assignments disappear.

Code

Here is the subscription code:

  connectoToConsumer: async () => {
    const groupId = `${Group}-adaptor`;
    log.debug({ msg: `Connecting to event consumer...`, groupId });
    _app.connection.consumer = _app.kafka.consumer({ groupId });
    await _app.connection.consumer.connect();
  },

  subscribe: async topics => {
    log.debug("Subscribing to many topics...");
    for (const topic of topics.split(",")) {
      await _app.subscribeConsumerToTopic(topic);
    }
  },

  subscribeConsumerToTopic: async topic => {
    log.debug({ msg: "Subscribing to topic", topic });
    try {
      await _app.connection.consumer.subscribe({ topic: topic, fromBeginning: false });
    } catch (e) {
      log.error({ msg: `FATAL ERROR: Event adaptor consumer connection failed`, type: "error", subtype: "fatal", source: "registerConsumer", cause: e });
      process.kill(process.pid, "SIGINT");
    }
  },

  runConsumer: async () => {
    await _app.connection.consumer.run({
      autoCommitInterval: 100,
      autoCommitThreshold: 100,
      eachMessage: async ({ ignore_topic, ignore_partition, message: event }) => handler(event)
    });
  },

To invoke this we use:

await _app.connectoToConsumer();
await _app.subscribe(topics);
await _app.runConsumer();

Output

When we run the consumer we see a log message from kafka:

{
  "level": "INFO",
  "timestamp": "2022-04-21T11:23:30.186Z",
  "logger": "kafkajs",
  "message": "[ConsumerGroup] Consumer has joined the group",
  "groupId": "service-invoice.projection.1-adaptor",
  "memberId": "ctx-service-invoice.projection.1-adaptor-f3ff61a9-9d21-4aeb-9560-ba415188cacd-3914c6d0-6674-4342-8a45-8114c3cd6b62",
  "leaderId": "ctx-service-invoice.projection.1-adaptor-f3ff61a9-9d21-4aeb-9560-ba415188cacd-3914c6d0-6674-4342-8a45-8114c3cd6b62",
  "isLeader": true,
  "memberAssignment": {
    "service-invoice.created.1": [
      0
    ],
    "service-invoice.approved.1": [
      0
    ],
    "service-invoice.service-orders-created.1": [
      0
    ],
    "service-invoice.service-orders-invoiced.1": [
      0
    ],
    "service-invoice.customer-changed.1": [
      0
    ]
  },
  "groupProtocol": "RoundRobinAssigner",
  "duration": 23555
}

Showing that we have successfully subscribed to 5 topics. We expect to see those 5 events in the order listed above. However, after the approve event, no other events are handled, and when we look at kafka all the members have gone. There is no exception or other log that would suggest an error in our code.

Incidentally, our handlers run very fast. They make a local HTTP call to another container in the same pod that updates a single field in a single document in MongoDB. Typically the handlers run and respond in < 500ms.

Suspicions

As JavaTechnical suggests in the answer below, we believe that this may be related to the heartbeat mechanism. The kafka logs show this:

2022-04-21 13:17:56,941 INFO [GroupCoordinator 0]: Member ctx-service-invoice.projection.1-adaptor-4b32ab43-17f2-4b98-84ed-b21ee36b771f-d6040fbd-2fb8-4025-9fdd-5fb8fb6b41a1 in group service-invoice.projection.1-adaptor has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator) [executor-Heartbeat]

We're using strimzi to run kafka in k8s, and we see in the logs:

broker.heartbeat.interval.ms = 2000
broker.session.timeout.ms = 9000
group.max.session.timeout.ms = 1800000
group.min.session.timeout.ms = 6000

We think that the consumption of the first two messages is probably just luck -- we're getting them before the session-timeout is triggered by the lack of heartbeats.

What have we tried?

We tried setting the heartbeat interval in our kafkajs code:

.connection.consumer = _app.kafka.consumer({ groupId, , heartbeatInterval: 1000 });

But that isn't helping.

  • Kafkajs documentation says that the eachMessage structure in the consumer runner should automate the heartbeats, but given the nature of the error we've also tried explicitly passing the heartbeat function to the handler and manually invoking it, also to no avail.

  • We've also tried extending the timeouts and heartbeat intervals using strimzi, but our attempts to do so failed (they crashed strimzi) so we've backed out of that.

Question

Given all of this, we suspect that the kafkajs heartbeats are not making it through to kafka in k8s. Our question is, how do we ensure that this works?

Versions

We're using the latest strimzi, kafka, and kafkajs in node 17+.


Solution

  • When we check, on startup, the consumer group has 5 member assignments, but after handling the 2nd event all of the member assignments disappear.

    If the members are not there in the group means they are kicked-off. This happens when your consumers are not able to send heartbeats either because they are down or because they are taking long time to send their heartbeats. Possible ways to debug:

    1. See if there is any network issue.

    2. See if your handler is taking long time than the configured session timeout.

    From the kafkajs documentation..

    Be aware that the eachMessage handler should not block for longer than the configured session timeout or else the consumer will be removed from the group.

    If no heartbeats are received within this sessionTimeout, your consumers will be kicked out of the group. The default is 30000ms. Check if you have modified it (or) it is too less for what you do in your handler()