Search code examples
javascriptrabbitmqamqpworker

Using AMQP to maintain a long-term connection to a remote worker


I'm trying to model the following scenario

Server dispatches a 'START' action to a worker process via amqp as follows (assume channel and action are supplied previously and action is START with some payload.)

channel.assertQueue('', { exclusive: true }).then(({ queue }) => {
  const cId = uuid()

  channel.consume(queue, (msg) => {
    if (msg.properties.correlationId === cId) {
      const response = JSON.parse(msg.content.toString())
      console.log('response', response)
      resolve(response)
    }
  }, { noAck: true })

  const msg = JSON.stringify(action)
  channel.sendToQueue(
    QUEUE_NAME,
    new Buffer(msg),
    { correlationId: cId, replyTo: queue }
  )
}, reject)

The worker gets the START action along with a correlationId and replyTo queue name, adds the payload to its own internal list of things to do, and responds to the replyTo queue with a 'START_SUCCESS' action.

Now the worker is going to go through its internal list of things to do and do them, and emits an 'UPDATE' action back to the server, via the same replyTo queue, so the server needs to know to keep listening to that queue for updates and it needs to know which worker is handling the updates for any specific task. The server is smart enough to know that a particular task has started and so will not dispatch it again in that case.

But when it's time for the worker to stop doing the task, the server needs to know which worker to send a 'STOP' message to. Is there a way for the worker to send back to the server some sort of direct amqp channel to itself that the server can use to send STOP messages?


Solution

  • The simplest answer would seem to be for the worker to create a "reply" queue, and send that identifier to the server in the 'START_SUCCESS' message, and the server store that state somewhere.

    However, I think much of the power of RabbitMQ comes from the fact that messages aren't published directly to queues, but to exchanges, and their ultimate destination is determined by their routing key. (Publishing by queue name is actually via an exchange that uses the routing key as the queue name.) If you're not familiar with the different types of exchange, read through the RabbitMQ Getting Started tutorials.

    In this case, rather than thinking of the server and worker needing to know each other's identity, you can think in terms of them publishing and subscribing to each other's updates. If everything is published to an exchange, then the server and worker don't actually need to know anything about each other's identity.

    Here's how I see that working:

    1. The server generates a unique ID for a particular job.
    2. The server publishes a START message to an exchange jobs.new, with a routing key classifying the type of job, and the job ID in the message.
    3. The server binds an anonymous queue to the direct or topic exchange jobs.status with the binding key set to the job ID.
    4. The worker starts up and takes one message from jobs.ready (or jobs.ready.some_type).
    5. The worker binds an anonymous queue to the jobs.control exchange with the job ID as the binding key.
    6. The worker starts the task, and publishes a START_SUCCESS message to the exchange jobs.status with the job ID as the routing key.
    7. The server receives the START_SUCCESS message from the queue it bound at step 3, and updates its state for that job.
    8. The worker periodically sends an UPDATE message to the jobs.status exchange; again, the routing key matches the job ID, so the server receives the message.
    9. When the server wants to stop (or modify) the running job, it publishes a STOP message to the jobs.control exchange with the job ID as the routing key.
    10. The worker receives this message on the queue bound at step 5, and stops the job.

    Viewed from the RabbitMQ side, you have these elements:

    • 3 exchanges:
      • jobs.new where servers publish new jobs. This could be a simple fanout exchange if all workers can handle all jobs, or it could be a topic exchange which routes it into different job queues for different types of worker.
      • jobs.status where updates are published by workers. This would be a direct or topic exchange, whose routing keys are, or contain, the job ID.
      • jobs.control where updates are published by the server to control existing jobs. Again, this would be a direct or topic exchange, whose routing keys are, or contain, the job ID.
    • Permanent queues:
      • A single jobs.ready queue, or distinct jobs.ready.some_type queues, bound to the jobs.new exchange.
    • Anonymous queues:
      • One queue per job created by the server and bound to the jobs.status exchange using that job's ID. Alternatively, the server process could have a single queue for inbound traffic, and simply read the job ID out of the received message.
      • One queue per worker created by the worker, and bound to the jobs.control exchange using the ID of the job it is currently processing.

    Note that you can attach additional queues to any of these exchanges to get a copy of the traffic, e.g. for logging or debugging. For a topic exchange, just bind the extra queue with the binding key #, and it will get a copy of all messages, without interrupting any existing bindings.