Search code examples
node.jspromisekafkajs

Why does Promise.race not resolve in kafkajs eachMessage callback


I have defined a promise like this ...

    const result = await Promise.race([
      new Promise(resolve => {
        consumer.run({
          eachMessage: ({ message }) => {
            const data = JSON.parse(message.value.toString());
            if (data.payload.template
              && data.payload.template.id === '...'
              && data.payload.to[0].email === email) {
              console.log('Should resolve!')
              resolve(data.payload.template.variables.link);
              console.log('resolved');

              consumer.pause();
              consumer.disconnect();
            }
          },
        });
      }),
      new Promise((_, reject) => setTimeout(reject, 3000))
    ]);
    console.log('result is ', result);
    return result;

I can get to resolved but it doesnt print the result at the end, it seems like neither did the timeout nor the actual promise work as expected? Why is that? I suspect its something to do with using resolve inside the kafka js callback?


UPDATE: Seems like its Promise.race() thats not resolving, but why?


Solution

  • My suspicion is that your "success-side" promise inadvertently throws and you're swallowing the error silently.

    Using a mock-up minimal implementation of the consumer (that succeeds or fails 50/50), the following code works.

    Run the code sample a couple of times to see both cases.

    var consumer = {
      interval: null,
      counter: 0,
      run: function (config) {
        this.interval = setInterval(() => {
          this.counter++;
          console.log(`Consumer: message #${this.counter}`);
          config.eachMessage({message: this.counter});
        }, 250);
      },
      pause: function () {
        console.log('Consumer: paused');
        clearInterval(this.interval);
      },
      disconnect: function () {
        console.log('Consumer: disconnected');    
      }
    };
    
    Promise.race([
      new Promise(resolve => {
        const expectedMsg = Math.random() < 0.5 ? 3 : 4;
        consumer.run({
          eachMessage: ({ message }) => {
            if (message === expectedMsg) resolve("success");
          }
        });
      }),
      new Promise((_, reject) => setTimeout(() => {
        reject('timeout');
        consumer.pause();
        consumer.disconnect();
      }, 1000))
    ]).then((result) => {
      console.log(`Result: ${result}`);
    }).catch((err) => {
      console.log(`ERROR: ${err}`);
    });

    I have also moved consumer.pause() and consumer.disconnect() to the "timeout-side" promise, this way the consumer is guaranteed to disconnect, albeit it might run a tiny bit longer than necessary in the success case.