Consider this test, in which a message is sent from the test to topic 'out', and the tested code is expected to consume it and reply by sending a message to topic 'in'. In order to pass, I want to make sure a message was sent to topic 'in'.
it('...', async () => {
/* initialize kafkaConsumer and kafkaProducer here */
async function someCallback() {
// ...
}
await kafkaConsumer.subscribe({ topic: 'in', fromBeginning: true })
await kafkaConsumer.run({ eachMessage: someCallback })
await kafkaProducer.send({ topic: 'out', messages: [{ key: '1', value: '2' }] })
// How do I block here until someCallback is called?
})
I read about using done
but I can't have that while the test itself is defined async
, which I need in order to use await
. Is there a different way I'm not aware of?
After some time with Tommy Brunn's answer, I found a few bugs and I ended up with this:
export const waitForKafkaMessages = async (
kafka: Kafka,
messagesAmount: number,
topic: string,
fromBeginning: boolean,
groupId: string,
): Promise<KafkaMessage[]> => {
const consumer: Consumer = kafka.consumer({ groupId })
await consumer.connect()
await consumer.subscribe({ topic, fromBeginning })
let resolveOnConsumption: (messages: KafkaMessage[]) => void
let rejectOnError: (e: Error) => void
const returnThisPromise = new Promise<KafkaMessage[]>((resolve, reject) => {
resolveOnConsumption = resolve
rejectOnError = reject
}).finally(() => consumer.disconnect()) // disconnection is done here, reason why is explained below
const messages: KafkaMessage[] = []
await consumer.run({
autoCommit: false,
eachMessage: async ({ message, partition, topic }) => {
try {
// eachMessage is called by eachBatch which can consume more than messagesAmount.
// This is why we manually commit only messagesAmount messages.
if (messages.length < messagesAmount) {
messages.push(message)
// +1 because we need to commit the next assigned offset.
await consumer.commitOffsets([{ topic, partition, offset: (Number(message.offset) + 1).toString() }])
}
if (messages.length === messagesAmount) {
// I think we should be able to close the connection here, but kafkajs has a bug which makes it hang if consumer.disconnect is called too soon after consumer.run .
// This is why we close it in the promise's finally block
resolveOnConsumption(messages)
}
} catch (e) {
rejectOnError(e)
}
},
})
return returnThisPromise
}