Search code examples

kafkajs - gracefully stop kafkajs instance after disconnect

I'm using kafkajs on both my production and integration tests.

Before all my tests I'm creating a kafkajs instance with producer & consumer connect/subscribe/run(eachMeassage)... After all my tests I want to stop gracefully all my node process including kafkajs components.

I'm doing actually this:

export function stopHelper(): Promise<void> {
    return new Promise<void>((resolve, reject) => {
        if (kafkaHelperState === kafkaHelperStateStatus.running) {
            kafkaHelperState = kafkaHelperStateStatus.stopping
            log.debug("stopHelper", kafkaHelperState);
            Promise.all([producer.disconnect, consumer.disconnect])
                .then(() => {
                    kafkaHelperState = kafkaHelperStateStatus.stopped
          "stopHelper", kafkaHelperState);
                .catch(error => reject(error))
        } else {
            log.warn("stopHelper", "kafkaHelper is not " + kafkaHelperStateStatus.running)

Promises seems to work. I'm able to see that my Integration Test suite is finished with both producer & consumer disconnected. But my node process is still running without doing anything.

Before that I was using kafka-node. When I stopped the consumer, my node process ends without having to specify any process.exit(0)

Does there is a way to gracefully destroy the instance of kafkajs in the node process?


  • Promise.all([producer.disconnect(), consumer.disconnect()])

    instead of

    Promise.all([producer.disconnect, consumer.disconnect])