Search code examples
python-3.xdjango-rest-frameworkapscheduler

How to schedule user-defined jobs with APscheduler in a Django view


The idea is to allow users to submit a time/date to the server to have a job scheduled to execute at the user-specified time.

My first choice was to do this in the view, where the data from the frontend is posted to. However, apscheduler returns a value error.

Below is a snippet from the view I wish to use for user-scheduled jobs and just a small test.

class SchedulePolicyDeployView(APIView):
 def post(self, request, version):

    def test_job():
        print("cron job test")

    scheduler.start()
    scheduler.add_job(test_job, "cron", id="test_job", day="*", minute="*/1")

    return Response("job scheduled successfully")

The error message returned is:

ValueError: This Job cannot be serialized since the reference to its callable (<function SchedulePolicyDeployView.post.<locals>.test_job at 0x05694C00>) could not be determined. Consider giving a textual reference (module:function name) instead.

Solution

  • So I did fix this issue in my code; but it's changed quite a bit since I did asked this question. I've tried to bring it all in here though:

    *assuming the scheduler object has already been initialised

    scheduler.py

    def schedule_job(arg1, arg2, arg3, time, job_id):
        scheduler.add_job(deploy_policy, 'date', args=[arg1, arg2, arg3], run_date=datetime(int(date['year']), int(date['month']), int(date['day']), int(time['hour']), int(time['minute']), 0))
        return Response({"message": "scheduled!"})
    

    views.py

    from scheduler import schedule_job
    class SchedulePolicyDeployView(APIView):
        def post(self, request, version):
            schedule_job(apic, username, password, policy_url_and_payload_result, date_dict, time_dict, payload["job_id"])
            return Response({"result": "success"})