Search code examples
pythonpython-3.xredisqueuepython-rq

python rq - how to trigger a job when multiple other jobs are finished? Multi job dependency work arround?


I have a nested job structure in my python redis queue. First the rncopy job is executed. Once this is finished the 3 dependant registration jobs follow. When the computation of all these 3 jobs is finished I want to trigger a job to send a websocket notification to my frontend.

My current try:

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    notify = redisqueue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))

Unfortunately it seems that multi job dependency feature was never merged into the master. I saw that there currently two pull requests on git. Is there a workaround which I can use?

Sorry for failing to provide a reproducible example.


Solution

  • New versions (RQ >= 1.8)

    You can simply use depends_on parameters, passing a list or a tuple.

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    
    notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=(t1c_reg, t2_reg, fla_reg))
    
    # you can also use a list instead of a tuple:
    # notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=[t1c_reg, t2_reg, fla_reg])
    

    Old versions (RQ < 1.8)

    I use this workaround: if the dependencies are n, I create n-1 wrappers of the real function: each wrapper depends on a different job.

    This solution is a bit involute , but it works.

    rncopy = redisqueue.enqueue(raw_nifti_copymachine, patientid, imagepath, timeout=6000)
    t1c_reg = redisqueue.enqueue(modality_registrator, patientid, "t1c", timeout=6000, depends_on=rncopy)
    t2_reg = redisqueue.enqueue(modality_registrator, patientid, "t2", timeout=6000, depends_on=rncopy)
    fla_reg = redisqueue.enqueue(modality_registrator, patientid, "fla", timeout=6000, depends_on=rncopy)
    
    notify = redisqueue.enqueue(first_wrapper, patient_finished, patientid,t2_reg.id,fla_reg.id, timeout=6000, depends_on=t1c_reg)
    
    def first_wrapper(patient_finished, patientid,t2_reg_id,fla_reg_id):
        queue = Queue('YOUR-QUEUE-NAME'))
        queue.enqueue(second_wrapper, patient_finished, patientid, fla_reg_id, timeout=6000, depends_on=t2_reg_id)
    
    def second_wrapper(patient_finished, patientid,fla_reg_id):
        queue = Queue('YOUR-QUEUE-NAME'))
        queue.enqueue(print, patient_finished, patientid, timeout=6000, depends_on=fla_reg_id)
    

    Some caveats:

    • I don't pass the queue object to the wrappers, because some serialization problems occur; so, the queue must be recovered by name...

    • For the same reason, I pass the job.id (instead of job object) to the wrappers.