Search code examples
node.jsjestjskafkajs

How to wait for Kafka respone in a Jest test with KafkaJS?


Consider this test, in which a message is sent from the test to topic 'out', and the tested code is expected to consume it and reply by sending a message to topic 'in'. In order to pass, I want to make sure a message was sent to topic 'in'.

it('...', async () => {
  /* initialize kafkaConsumer and kafkaProducer here */

  async function someCallback() {
    // ...
  }

  await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
  await kafkaConsumer.run({ eachMessage: someCallback })

  await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })

  // How do I block here until someCallback is called?
})

I read about using done but I can't have that while the test itself is defined async, which I need in order to use await. Is there a different way I'm not aware of?


Solution

  • After some time with Tommy Brunn's answer, I found a few bugs and I ended up with this:

    export const waitForKafkaMessages = async (
      kafka: Kafka,
      messagesAmount: number,
      topic: string,
      fromBeginning: boolean,
      groupId: string,
    ): Promise<KafkaMessage[]> => {
      const consumer: Consumer = kafka.consumer({ groupId })
      await consumer.connect()
      await consumer.subscribe({ topic, fromBeginning })
    
      let resolveOnConsumption: (messages: KafkaMessage[]) => void
      let rejectOnError: (e: Error) => void
    
      const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
        resolveOnConsumption = resolve
        rejectOnError = reject
      }).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below
    
      const messages: KafkaMessage[] = []
      await consumer.run({
        autoCommit: false,
        eachMessage: async ({ message, partition, topic }) => {
          try {
            // eachMessage is called by eachBatch which can consume more than messagesAmount.
            // This is why we manually commit only messagesAmount messages.
            if (messages.length < messagesAmount) {
              messages.push(message)
    
              // +1 because we need to commit the next assigned offset.
              await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
            }
    
            if (messages.length === messagesAmount) {
              // I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
              // This is why we close it in the promise's finally block
    
              resolveOnConsumption(messages)
            }
          } catch (e) {
            rejectOnError(e)
          }
        },
      })
    
      return returnThisPromise
    }