Search code examples

How should I handle RQ workers for overlapping batch processes

I've started using RQ / Redis to build out some asynchronous execution of some long running jobs for my django site. I'm hoping to do something like the following:

  • I want one queue for each instance of a model. You can think of this model like an api user account.(there will not be a lot of these. 15 - 20 at most)

  • I will be distributing batches of tasks (anywhere from 10 - 500) evenly across the queues. Multiple batches may be added before the first batch completes.

  • With each batch, I would like to start up a worker for each queue that is not actively being worked on, and I would like to run these workers in batch mode, so that once they run out of tasks they will shut down.

  • I realize I could just not run them in batch mode, and then I would always be working on / listening for work on all of the queues. The problem with this is that I would like to be able to add and remove queues dynamically, so it's better if it starts up the available queues with each batch.

I realize it might seem odd that I'm distributing tasks across the queues, but the reason for this is that each task in the same queue must be rate limited / throttled according to a service I'm using (think of it as an API rate limit, but where each queue represents a different account). But for my purposes it makes no difference which account the task is running on, so I might as well parallelize across all the accounts.

The problem that I am facing is that if I start a worker and give it a queue that is already being worked on, I now have two workers operating independently on that queue and so my expected throttling rate gets cut in half. How can I only start a worker if there is not already a worker operating on that queue? I could probably find a hacky solution to this, but I would prefer to handle it the "right" way and since I don't have much experience with queues I thought I should ask.

I am already implementing my own worker class so that I can dynamically control the queues, so I just need a way to add logic where if that queue is already being worked on, it will not be given a new worker. A simple version of my worker is here:

import sys
from Api.models import *
from rq import Queue, Connection, Worker

# importing the necessary namespace for the tasks to run
from tasks import *

# dynamically getting the queue names in which I am expecting tasks
queues = [ for user in ApiUser.objects.all()]

with Connection():

    qs = list(map(Queue, queues)) or [Queue()]

    w = Worker(qs)


  • Finding a solution just meant diving into python-rq's source code a little bit. I might look into improving the documentation. Anyway, this seems to work for my needs!

    import sys
    from Api.models import *
    from rq import Queue, Connection, Worker
    # importing the necessary namespace for the tasks to run
    from tasks import *
    # Provide queue names to listen to as arguments to this script,
    with Connection():
        current_workers = Worker.all()
        working_queues = [ for worker in current_workers for queue in worker.queues]
        proposed_queues = [ for user in ApiUser.objects.all()]
        queues_to_start = [queue for queue in proposed_queues if not queue in working_queues]
        if len(queues_to_start) > 0:
            qs = list(map(Queue, queues_to_start))
            w = Worker(qs)
            print("Nothing to do here.")