I have a celery task on Heroku that connects to an external API and retrieves some data, stores in the database and repeats several hundred times. Very quickly (after ~10 loops) Heroku starts warning about high memory usage. Any ideas?
tasks.py
@app.task
def retrieve_details():
for p in PObj.objects.filter(some_condition=True):
p.fetch()
models.py
def fetch(self):
v_data = self.service.getV(**dict(
Number=self.v.number
))
response = self.map_response(v_data)
for key in ["some_key","some_other_key",]:
setattr(self.v, key, response.get(key))
self.v.save()
Heroky logs
2017-01-01 10:26:25.634
132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)
Go to the log: https://api.heroku.com/myapps/[email protected]/addons/logentries
You are receiving this email because your Logentries alarm "Memory quota exceeded"
has been triggered.
In context:
2017-01-01 10:26:25.568 131 <45>1 2017-01-01T10:26:25.457354+00:00 heroku run.5891 - - Process running mem=595M(116.2%)
2017-01-01 10:26:25.634 132 <45>1 2017-01-01T10:26:25.457411+00:00 heroku run.5891 - - Error R14 (Memory quota exceeded)
To expand on the veritable rdegges thoughts, here are two strategies I have used in the past when working with celery/python to help reduce the memory footprint: (1) kick off subtasks that each process exactly one object and/or (2) use generators.
kick off subtasks that each process exactly one object:
@app.task
def retrieve_details():
qs = PObj.objects.filter(some_condition=True)
for p in qs.values_list('id', flat=True):
do_fetch.delay(p)
@app.task
def do_fetch(n_id):
p = PObj.objects.get(id=n_id)
p.fetch()
Now you can tune celery so that it kills of processes after processing N number of PObj (tasks) to keep memory footprint low using --max-tasks-per-child
.
Using generators: you can also try this using generators so that you can (theoretically) throw away the PObj after you call fetch
def ps_of_interest(chunk=10):
n = chunk
start = 0
while n == chunk:
some_ps = list(PObj.objects.filter(some_condition=True)[start:start + n])
n = len(some_ps)
start += chunk
for p in some_ps:
yield p
@app.task
def retrieve_details():
for p in ps_of_interest():
p.fetch()
For my money, I’d go with option #1.