Search code examples
pythonredisdistributed-computing

Rq Worker with multiple connections


I have 3 servers in the same network. On each of those servers a redis service and some sort of producer are running. The producer enqueues jobs to a local rq queue named tasks. So each server has it's own tasks queue.

Also, there's one more server that is running an rq worker. Is it possible to have that worker check the tasks queue on each of the 3 servers?

I have tried creating a list of connections

import redis
from rq import Queue, Worker
from rq import push_connection
# urls = [url1, url2, url3]
connections = list(map(redis.from_url, urls))

which I then use to create a list of queues.

queues = list(map(lambda c: Queue('tasks', connection=c), connections))

Afterwards I push all the connections

for connection in connections:
    push_connection(connection)

and pass the queues to Worker

Worker(queues=queues).work()

This results in the worker only listening on tasks on whatever connection was pushed last.

I've been looking into the code on rq and I think I could write a custom worker class that does this but before I do that I wanted to ask if there's another way. Maybe even another queueing framework entirely?


Solution

  • Okay , I solved the problem. I'm still unsure whether I have permission to post actual source code here so I will outline my solution.

    I had to override register_birth(self), register_death(self), and dequeue_job_and_maintain_ttl(self, timeout). The original implementation for these functions can be found here.

    register_birth(self) -> None

    Basically, you have to iterate over all connections, push_connection(connection), complete the registration process, and pop_connection().

    Be careful to only list the queues corresponding to that connection in the mapping variable. The original implementation uses queue_names(self) to get a list of queue names. You'll have to do the same thing queue_names(self) does but only for the relevant queues.

    register_death(self) -> None

    Essentially the same as register_birth. Iterate over all connections, push_connection(connection), complete the same steps as the original implementation, and pop_connection().

    dequeue_job_and_maintain_ttl(self, timeout: Optional[int]) -> Tuple[Job, Queue]

    Let's take a look at the original implementation of this function. We'll want to keep everything the same until we get to the try block. Here we want to iterate over all connections endlessly. You can do this by using itertools.cycle.

    Inside the loop push_connection(connection), and set self.connection to the current connection. If self.connection = connection is missing, the result of the job may not be properly returned.

    Now we'll proceed to call self.queue_class.dequeue_any similar to the original implementation. But we'll set the timeout to 1 so we can proceed to check another connection if the current one doesn't have any jobs for the worker.

    Make sure self.queue_class.dequeue_any is called with a list of queues corresponding to the current connection. In this case queues contains only the relevant queues.

    result = self.queue_class.dequeue_any(
        queues, 1, connection=connection, job_class=self.job_class)
    

    Afterwards pop_connection(), and do the same check on result as the original implementation. If result is not None we've found a job to do and need to break out of the loop.

    Keep everything else from the original implementation. Don't forget the break at the end of the try block. It breaks out of the while True loop.

    Another thing

    Queues contain a reference to their connection. You could use this to create a list of (connection, queues) where queues contains all queues with connection connection.

    If you pass the resulting list to itertools.cycle you get the endless iterator you need in overriding dequeue_job_and_maintain_ttl.

    Update (2023-12-12)

    heartbeat(timeout: Optional[int] = None, pipeline: Optional[Pipeline] = None) -> None

    Recently, I had to fix what turned out to be a heartbeat related issue and had to override heartbeat. If the given pipeline parameter is not None then just call the parent implementation. Otherwise, call the parent implementation for all connections in self.connections = {q.connection for q in self.queues}.

    maintain_heartbeats(job: Job)

    This function is called while the worker is waiting for the horse to finish the task. It only handles one connection, so we'll extend its functionality to handle multiple connections. Again, we iterate over all connections and implement almost the same behaviour as the parent. In the parent implementation of maintain_heartbeats the result of executing the operations added to the pipeline is expected to have at least 3 elements. This is not the case when the connection we're checking is not the same connection as the given job. Thus, before accessing the array we have to check whether the connection we are currently looking at is the same as the job's connection. The Job class has a connection attribute.

    set_state(self, state: str, pipeline: Optional[Pipeline] = None)

    Call the parent implementation for each connection. This should ensure that the state of the worker is correct on every connection.