Actually, I couldn't find a proper topic. I'm trying to use the Apscheduler
library. I created a class method for the library and added some missing features by myself as a whole new scheduler library. When using an SQLAlchemyJobStore
the added functions to scheduler must be serializable but as I created a new class the added functions are members of the class itself. For example, suppose this code:
from datetime import datetime
from time import sleep
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
class sched:
def __init__(self):
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///test.db')
}
self.scheduler = BackgroundScheduler(jobstores=jobstores)
def add_func(self, some_func, args={}, kwargs={}):
self.scheduler.add_job(
some_func, 'interval', args=args, kwargs=kwargs, seconds=5, replace_existing=True)
def start(self):
self.scheduler.start()
def tick(self, i, j, **kwargs):
print("hello{} {}".format(i, j))
for k in kwargs:
print(k, ":", kwargs[k])
if __name__ == '__main__':
schedule = sched()
schedule.start()
schedule.add_func(schedule.tick, args=[1, 2], kwargs=dict(a=1, b=2))
# using the following three lines solve the problem
# sched = sched()
# sched.start()
# sched.add_func(sched.tick, args=[1, 2], kwargs=dict(a=1, b=2))
while True:
sleep(.1)
Traceback (most recent call last): File "...\Python36\lib\site-packages\apscheduler\executors\base.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) TypeError: tick() missing 1 required positional argument: 'j'
This error means the whole class cannot be serialized but if I name my instance same as class name (here sched
) it would be fixed.
In the example, the tick
function must be a member of schedule class itself because in my original class scheduler.add_job
gets a modified function from the class methods and currently, I don't want to edit my new schedule class entirely.
My question is when I want to import the class from a file the exact problem happens. Is there any other way to just import the class from a file?
Because the whole class sched
cannot be serialized in Database, class MUST be defined as static. So I changed the class to static class.
from datetime import datetime
from time import sleep
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
class sched:
jobstores = {
'default': SQLAlchemyJobStore(url='sqlite:///test.db')
}
scheduler = BackgroundScheduler(jobstores=jobstores)
@classmethod
def add_func(cls, some_func, args={}, kwargs={}):
cls.scheduler.add_job(
some_func, 'interval', args=args, kwargs=kwargs, seconds=5, replace_existing=True)
@classmethod
def start(cls):
cls.scheduler.start()
@classmethod
def tick(cls, i, j, **kwargs):
print("hello{} {}".format(i, j))
for k in kwargs:
print(k, ":", kwargs[k])
if __name__ == '__main__':
schedule = sched()
schedule.start()
schedule.add_func(schedule.tick, args=[1, 2], kwargs=dict(a=1, b=2))
while True:
sleep(.1)