I have 3 servers in the same network. On each of those servers a redis service and some sort of producer are running. The producer enqueues jobs to a local rq queue named tasks
.
So each server has it's own tasks
queue.
Also, there's one more server that is running an rq worker. Is it possible to have that worker check the tasks
queue on each of the 3 servers?
I have tried creating a list of connections
import redis
from rq import Queue, Worker
from rq import push_connection
# urls = [url1, url2, url3]
connections = list(map(redis.from_url, urls))
which I then use to create a list of queues.
queues = list(map(lambda c: Queue('tasks', connection=c), connections))
Afterwards I push all the connections
for connection in connections:
push_connection(connection)
and pass the queues to Worker
Worker(queues=queues).work()
This results in the worker only listening on tasks
on whatever connection was pushed last.
I've been looking into the code on rq and I think I could write a custom worker class that does this but before I do that I wanted to ask if there's another way. Maybe even another queueing framework entirely?
Okay , I solved the problem. I'm still unsure whether I have permission to post actual source code here so I will outline my solution.
I had to override register_birth(self)
, register_death(self)
, and dequeue_job_and_maintain_ttl(self, timeout)
. The original implementation for these functions can be found here.
register_birth(self) -> None
Basically, you have to iterate over all connections, push_connection(connection)
, complete the registration process, and pop_connection()
.
Be careful to only list the queues corresponding to that connection in the mapping
variable. The original implementation uses queue_names(self)
to get a list of queue names. You'll have to do the same thing queue_names(self)
does but only for the relevant queues.
register_death(self) -> None
Essentially the same as register_birth
. Iterate over all connections, push_connection(connection)
, complete the same steps as the original implementation, and pop_connection()
.
dequeue_job_and_maintain_ttl(self, timeout: Optional[int]) -> Tuple[Job, Queue]
Let's take a look at the original implementation of this function. We'll want to keep everything the same until we get to the try
block. Here we want to iterate over all connections endlessly. You can do this by using itertools.cycle.
Inside the loop push_connection(connection)
, and set self.connection
to the current connection. If self.connection = connection
is missing, the result of the job may not be properly returned.
Now we'll proceed to call self.queue_class.dequeue_any
similar to the original implementation. But we'll set the timeout to 1
so we can proceed to check another connection if the current one doesn't have any jobs for the worker.
Make sure self.queue_class.dequeue_any
is called with a list of queues corresponding to the current connection. In this case queues
contains only the relevant queues.
result = self.queue_class.dequeue_any(
queues, 1, connection=connection, job_class=self.job_class)
Afterwards pop_connection()
, and do the same check on result
as the original implementation. If result
is not None
we've found a job to do and need to break
out of the loop.
Keep everything else from the original implementation. Don't forget the break
at the end of the try
block. It breaks out of the while True
loop.
Queues contain a reference to their connection. You could use this to create a list of (connection, queues)
where queues
contains all queues with connection connection
.
If you pass the resulting list to itertools.cycle you get the endless iterator you need in overriding dequeue_job_and_maintain_ttl
.
Update (2023-12-12)
heartbeat(timeout: Optional[int] = None, pipeline: Optional[Pipeline] = None) -> None
Recently, I had to fix what turned out to be a heartbeat related issue and had to override heartbeat
. If the given pipeline
parameter is not None then just call the parent implementation. Otherwise, call the parent implementation for all connections in self.connections = {q.connection for q in self.queues}
.
maintain_heartbeats(job: Job)
This function is called while the worker is waiting for the horse to finish the task. It only handles one connection, so we'll extend its functionality to handle multiple connections. Again, we iterate over all connections and implement almost the same behaviour as the parent. In the parent implementation of maintain_heartbeats
the result of executing the operations added to the pipeline is expected to have at least 3 elements. This is not the case when the connection we're checking is not the same connection as the given job. Thus, before accessing the array we have to check whether the connection we are currently looking at is the same as the job's connection. The Job
class has a connection
attribute.
set_state(self, state: str, pipeline: Optional[Pipeline] = None)
Call the parent implementation for each connection. This should ensure that the state of the worker is correct on every connection.