Search code examples
javascriptnode.jsrabbitmqqueuenode-amqplib

Priority based queue


I am trying to implement a solution based on queues for our Discord-related tasks which uses Discord API. Need some input regarding how this can be implemented.

Here are the expectations from it

  1. Due to the global rate limit, only one task can be performed at a time.
  2. Tasks have two priority levels: high and low.
  3. High-priority tasks are always picked first after completing the current task (if there are any).

Here's what I've tried I've tried using amqplib and RabbitMQ with a direct type exchange, but I am unable to pause one queue based on another.

const amqp = require('amqplib');
async function setup() {
  let connection;
  let channel;

  try {
    // Connect to the RabbitMQ server
    connection = await amqp.connect('amqp://localhost');

    // Create a channel
    channel = await connection.createChannel();

    // Assert an exchange of type 'direct' that is durable
    await channel.assertExchange('priority_exchange', 'direct', {
      durable: true,
    });

    // Assert queues that are durable
    await channel.assertQueue('high_priority_queue', {
      durable: true,
      arguments: {
        'x-max-priority': 10, // Ensure this matches the existing queue configuration
      },
    });
    await channel.assertQueue('low_priority_queue', {
      durable: true,
    });

    // Bind the queues to the exchange with specific routing keys
    await channel.bindQueue('high_priority_queue', 'priority_exchange', 'high');
    await channel.bindQueue('low_priority_queue', 'priority_exchange', 'low');

    console.log('Setup complete.');
  } catch (error) {
    console.error('Setup failed:', error);
  } finally {
    // Ensure the channel and connection are closed properly
    if (channel) {
      await channel.close();
    }
    if (connection) {
      await connection.close();
    }
  }
}

async function consumer() {
  const connection = await amqp.connect('amqp://localhost');
  const channel = await connection.createChannel();
  await channel.assertQueue('high_priority_queue', {
    durable: true,
    arguments: {
      'x-max-priority': 10,
    },
  });
  await channel.assertQueue('low_priority_queue', {
    durable: true,
  });

  channel.prefetch(1);
  channel.consume(
    'high_priority_queue',
    async(msg) => {
      if (msg) {
        console.log(
          `Received ${msg.content.toString()} from high_priority_queue`
        );
        await fakePromise();
        channel.ack(msg);
      }
    }, {
      noAck: false
    }
  );

  channel.consume(
    'low_priority_queue',
    async(msg) => {
      if (msg) {
        console.log(
          `Received ${msg.content.toString()} from low_priority_queue`
        );
        await fakePromise();
        channel.ack(msg);
      }
    }, {
      noAck: false
    }
  );
}

consumer().catch(console.error);

function fakePromise() {
  return new Promise((resolve) => {
    setTimeout(() => {
      resolve();
    }, 5000);
  });
}


Solution

  • If you're fine with classic queues, you can make use of a priority queue.

    To have it work, apart from using the priority field of basic.properties, you need to make sure that you have a single consumer and that he has qos (maximum size of the "batch" of messages he can be processing at a time) to 1.

    That way any of the messages in the queue that are not currently assigned to a consumer will be prioritized based on the priority provided, higher value first.

    Worth noting that with this approach there is no logic available (to my knowledge) to push a priority message over a lower priority message currently being processed.