I want to use rq to run tasks on a separate worker to gather data from a measuring instrument. The end of the task will be signaled by a user pressing a button on a dash app. The problem is that the task itself does not know when to terminate since it doesn't have access to the dash app's context.
I already use meta
to pass information from the worker back to the caller but can I pass information from the caller to the worker?
Example task:
from rq import get_current_job
from time import time
def mock_measurement():
job = get_current_job()
t_start = time()
# Run the measurement
t = []
i = []
job.meta['should_stop'] = False # I want to use this tag to tell the job to stop
while not job.meta['should_stop']:
t.append(time() - t_start)
i.append(np.random.random())
job.meta['data'] = (t, i)
job.save_meta()
sleep(5)
print("Job Finished")
From the console, I can start a job as such
queue = rq.Queue('test-app', connection=Redis('localhost', 6379))
job = queue.enqueue('tasks.mock_measurement')
and I would like to be able to do this from the console to signify to the worker it can stop running:
job.meta['should_stop'] = True
job.save_meta()
job.refresh
However, while the commands above return without an error, they do not actually update the meta
dictionary.
Because you didn't fetch the updated meta. But, don't do this!! Invoking save_meta and refresh in caller and worker will lose data.
Instead, Use job.connection.set(job + ':should_stop', 1, ex=300)
to set flag, and use job.connection.get(job + ':should_stop')
to check if flag is set.