I am trying to run ML training script at every 1 hour interval, but after every hour the memory usage increases by roughly 20% and after 3-4 hours, memory usage reaches to 90% and then this script throws Memory Error. I am wondering why memory is not released when the train function finishes.
Although this behaviour is not shown if i run train function manually (By not using any kind of thread scheduler and calling train function twice or thrice one after another).
Any suggestion to train the model after every specific interval.
Here is the code.
import pickle
import pandas as pd
from pymongo import MongoClient
import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
def train():
client = MongoClient(databaseURI)
db = client['mydb']
movie_data = []
for index, obj in enumerate(db.movies.find({})):
movie_obj = {}
movie_obj['_id'] = obj['_id']
movie_obj['title'] = obj['title']
movie_obj['rating'] = obj['rating']
movie_data.append(movie_obj)
user_data = []
for index, obj in enumerate(db.users.find({})):
user_obj = {}
user_obj['_id'] = obj['_id']
user_obj['username'] = obj['username']
user_obj['movie_id'] = obj['movie_id']
user_obj['rating'] = obj['rating']
user_data.append(user_obj)
movie_data_df = pd.DataFrame(movie_data)
user_data_df = pd.DataFrame(user_data)
# some ML training ALGO
trainedModel = algo.train(user_data_df, movie_data_df)
trained.to_pickle('files/trained.pkl')
scheduler = BlockingScheduler()
scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())
scheduler.start()
Job stores house the scheduled jobs. The default job store simply keeps the jobs in memory, but others store them in various kinds of databases. A job’s data is serialized when it is saved to a persistent job store, and deserialized when it’s loaded back from it. Job stores (other than the default one) don’t keep the job data in memory, but act as middlemen for saving, loading, updating and searching jobs in the backend.
I suggest try one of the following solutions:
Change jobstore
from the default (i.e. memory) to some persistent place (Example).
Or try to set the parameter replace_existing
to True
(as the default is False
).
scheduler.add_job(train, 'interval', hours=1,
next_run_time=datetime.datetime.now(), replace_existing=True)
Side-note:
I think there could be another ugly way to fix it (I haven't tried it!), such that you add a Listener
to listen to the crashes and restart the whole process! (if you can try it and amend it in a more pythonic way!)
scheduler = BlockingScheduler()
scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())
def my_listener(event):
if event.exception:
global scheduler
scheduler.shutdown()
gc.collect()
scheduler = BlockingScheduler()
scheduler.add_job(train, 'interval', hours=1, next_run_time=datetime.datetime.now())
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()
scheduler.add_listener(my_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
scheduler.start()