Search code examples
pythonpython-2.7queuescheduled-tasksscheduler

How to pass arguments when execute jobs in parallel with a shared job queue using python schedule module


I intended to run several jobs in parallel and followed the example here using a job queue, but it executed once and raised an exception 'NoneType' object is not callable when I tried to pass arguments. The code listed below:

import Queue
import schedule
import threading
import time

def job(arg):
    print 'Argument is %s' % arg

def worker_main():
    while True:
        try:
            job_func = jobqueue.get()
            job_func()
        except Exception as e:
            print e

jobqueue = Queue.Queue()

schedule.every(2).seconds.do(jobqueue.put, job(1))
schedule.every(2).seconds.do(jobqueue.put, job(2))

worker_thread = threading.Thread(target=worker_main)
worker_thread.start()

while True:
    try:
        schedule.run_pending()
        time.sleep(1)
    except Exception as e:
        print e
        sys.exit()

The output is:

Arg is 1
Arg is 2
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
'NoneType' object is not callable
...

Any ideas to solve this?


Solution

  • The reason for that is the parameter passed to do method in schedule.every(2).seconds.do(jobqueue.put, job(1)) is actually None.

    Because the code is calling job function and passing 1 (and 2) as arguments to job. So the return value of the job function (which is None since it's just printing) is passed as the second argument to do method call. So instead of a function reference, a None instance is being stored in the job queue.

    The problem of passing arguments to the jobs is that the do method from schedule package, can accept extra arguments for the job to run, but what is being scheduled, is to put the job in the queue, and the queue items are only function references without extra arguments.

    One solution is to put the jobs alongside their arguments to the queue. Then the worker needs to get them and call the job by passing the arguments to it. Something like this:

    import Queue
    import schedule
    import threading
    import time
    
    def job(arg):
        print 'Argument is %s' % arg
    
    def worker_main():
        while True:
            try:
                job_func, job_args = jobqueue.get()
                job_func(*job_args)
            except Exception as e:
                print e
    
    jobqueue = Queue.Queue()
    
    schedule.every(2).seconds.do(jobqueue.put, (job, [1]))
    schedule.every(2).seconds.do(jobqueue.put, (job, [2]))
    
    worker_thread = threading.Thread(target=worker_main)
    worker_thread.start()
    
    while True:
        try:
            schedule.run_pending()
            time.sleep(1)
        except Exception as e:
            print e
            sys.exit()
    

    Here we're putting a tuple of the job function reference, and a list of arguments to the queue. Then the worker would fetch them, and passes the list of arguments to the job function.

    Another solution is to wrap the jobs (job(1) and job(2) calls) in other functions that do not need an argument, then register those functions to the job queue, like this:

    def job1():
        job(1)
    
    def job2():
        job(2)
    
    jobqueue = Queue.Queue()
    
    schedule.every(2).seconds.do(jobqueue.put, job1)
    schedule.every(2).seconds.do(jobqueue.put, job2)