I encountered some behavior in trying to save changes to my model during a celery task but changes do not commit. I have a model that keeps a record of uploads by users, once the file is uploaded, a celery task is run to process the csv and save the results of the process into the database, e.g. process status, time processed, number of records processed etc.
In models.py, I have the following methods:
def import_records_data(self):
total_records = 0
with self.filepath.file as csvfile:
reader = csv.reader(csvfile, delimiter=',')
next(reader, None) # skip header
for row in reader:
# process record
total_records += 1
return total_records
def process(self):
self.date_start_processing = datetime.datetime.utcnow().replace(tzinfo=utc)
try:
# process upload data,
records_processed = self.import_records_data()
except Exception, e:
self._mark_failed(unicode(e))
else:
self._mark_processed(num_records=records_processed)
def _mark_processed(self, num_records, description=None):
self.status = self.PROCESSED
self.date_end_processing = datetime.datetime.utcnow().replace(tzinfo=utc)
self.num_records = num_records
self.processing_description = description
self.save()
def _mark_failed(self, description):
self.status = self.FAILED
self.processing_description = description
self.save()
def was_processing_successful(self):
return self.status == self.PROCESSED
When _mark_processed or _mark_failed is called, the changes are not saved into the database when self.save()
is called. This method is called from tasks.py:
@task(name='csv-process-upload')
def process_upload(upload_id):
upload = Upload.objects.get(id=upload_id)
upload.process()
if upload.was_processing_successful():
message_user(
upload.user,
"Your upload '%s' was processed successfully, %s records processed" % (
upload.filename,
upload.num_records))
else:
message_user(
upload.user,
"Your upload '%s' could not be processed, error message: %s" % (
upload.filename,
upload.processing_description,))
What could be preventing the model from saving? When I debug _mark_processed
in shell and type self.save()
, the changes are reflected in the database.
Try using django celery transactions to call your tasks to ensure your views complete the transaction before triggering the task to execute.