I am using post-save signal
for a model to start a celery task. It was working before but suddenly, it is giving me a TransactionManagementError.
Model
class FieldSightXF(models.Model):
xf = models.ForeignKey(XForm, related_name="field_sight_form")
site = models.ForeignKey(Site, related_name="site_forms", null=True, blank=True)
project = models.ForeignKey(Project, related_name="project_forms", null=True, blank=True)
is_staged = models.BooleanField(default=False)
is_scheduled = models.BooleanField(default=False)
date_created = models.DateTimeField(auto_now=True)
date_modified = models.DateTimeField(auto_now=True)
schedule = models.OneToOneField(Schedule, blank=True, null=True, related_name="schedule_forms")
stage = models.OneToOneField(Stage, blank=True, null=True, related_name="stage_forms")
shared_level = models.IntegerField(default=2, choices=SHARED_LEVEL)
form_status = models.IntegerField(default=0, choices=FORM_STATUS)
fsform = models.ForeignKey('self', blank=True, null=True, related_name="parent")
is_deployed = models.BooleanField(default=False)
is_deleted = models.BooleanField(default=False)
is_survey = models.BooleanField(default=False)
from_project = models.BooleanField(default=True)
default_submission_status = models.IntegerField(default=0, choices=FORM_STATUS)
logs = GenericRelation('eventlog.FieldSightLog')
class Meta:
db_table = 'fieldsight_forms_data'
# unique_together = (("xf", "site"), ("xf", "is_staged", "stage"),("xf", "is_scheduled", "schedule"))
verbose_name = _("XForm")
verbose_name_plural = _("XForms")
ordering = ("-date_created",)
Post-save signal
@receiver(post_save, sender=FieldSightXF)
def share_form(sender, instance, created, **kwargs):
if instance.project is not None and created:
from onadata.apps.fsforms.tasks import share_form_managers
task_obj = CeleryTaskProgress.objects.create(user=instance.xf.user,
description="Share Forms",
task_type=17, content_object=instance)
if task_obj:
try:
share_form_managers.delay(instance.id, task_obj.id)
except:
pass
post_save.connect(create_messages, sender=FieldSightXF)
The CeleryTaskProgress
is used to track the progress of the celery tasks.
Task
@shared_task(max_retires=5)
def share_form_managers(fxf, task_id):
fxf = FieldSightXF.objects.get(pk=fxf)
userrole = UserRole.objects.filter(project=fxf.project, group__name='Project Manager')
users = User.objects.filter(user_roles__in=userrole)
shared = share_form(users, fxf.xf)
if shared:
CeleryTaskProgress.objects.filter(id=task_id).update(status=2)
else:
CeleryTaskProgress.objects.filter(id=task_id).update(status=3)
share_form method
def share_form(users, xform):
from onadata.apps.fsforms.models import ObjectPermission, Asset
for user in users:
try:
codenames = ['view_asset', 'change_asset']
permissions = Permission.objects.filter(content_type__app_label='kpi', codename__in=codenames)
for perm in permissions:
object_id = Asset.objects.get(uid=xform.id_string).id
content_type = ContentType.objects.get(id=21)
# Create the new permission
new_permission = ObjectPermission.objects.create(
object_id=object_id,
content_type=content_type,
user=user,
permission_id=perm.pk,
deny=False,
inherited=False
)
except:
return False
else:
return True
What this process does is that whenever an object of FieldSightXF
is created(A form is assigned to a project), then the form is shared to the project managers of that project.
Previously there was no problem as I was passing the FieldSightXF
object as a parameter to the task, but now I am passing the object id as:
Previously
share_form_managers.delay(instance, task_obj.id)
Current
share_form_managers.delay(instance.id, task_obj.id)
Now, both the cases gives me the mentioned error. If I comment the above line from the post-save signal method, the error is gone.
I did try the suggestions from other answers that were given to similar problem but they did not work for me.
Full Error Traceback:
ERROR 2019-06-13 10:49:50,743 base 11527 140653367232256 Internal Server Error: /forms/api/fxf/
Traceback (most recent call last):
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/core/handlers/base.py", line 132, in get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
return view_func(*args, **kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/viewsets.py", line 87, in view
return self.dispatch(request, *args, **kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 466, in dispatch
response = self.handle_exception(exc)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 463, in dispatch
response = handler(request, *args, **kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/mixins.py", line 21, in create
self.perform_create(serializer)
File "/home/sanip/naxa/source/fieldsight/onadata/apps/fsforms/viewsets/FieldSightXformViewset.py", line 85, in perform_create
fxf.save()
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 734, in save
force_update=force_update, update_fields=update_fields)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 762, in save_base
updated = self._save_table(raw, cls, force_insert, force_update, using, update_fields)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 827, in _save_table
forced_update)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 877, in _do_update
return filtered._update(values) > 0
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/query.py", line 580, in _update
return query.get_compiler(self.db).execute_sql(CURSOR)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 1062, in execute_sql
cursor = super(SQLUpdateCompiler, self).execute_sql(result_type)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 840, in execute_sql
cursor.execute(sql, params)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 79, in execute
return super(CursorDebugWrapper, self).execute(sql, params)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 59, in execute
self.db.validate_no_broken_transaction()
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/base/base.py", line 327, in validate_no_broken_transaction
"An error occurred in the current transaction. You can't "
TransactionManagementError: An error occurred in the current transaction. You can't execute queries until the end of the 'atomic' block.
Internal Server Error: /forms/api/fxf/
Traceback (most recent call last):
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/core/handlers/base.py", line 132, in get_response
response = wrapped_callback(request, *callback_args, **callback_kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
return view_func(*args, **kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/viewsets.py", line 87, in view
return self.dispatch(request, *args, **kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 466, in dispatch
response = self.handle_exception(exc)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 463, in dispatch
response = handler(request, *args, **kwargs)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/mixins.py", line 21, in create
self.perform_create(serializer)
File "/home/sanip/naxa/source/fieldsight/onadata/apps/fsforms/viewsets/FieldSightXformViewset.py", line 85, in perform_create
fxf.save()
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 734, in save
force_update=force_update, update_fields=update_fields)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 762, in save_base
updated = self._save_table(raw, cls, force_insert, force_update, using, update_fields)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 827, in _save_table
forced_update)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 877, in _do_update
return filtered._update(values) > 0
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/query.py", line 580, in _update
return query.get_compiler(self.db).execute_sql(CURSOR)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 1062, in execute_sql
cursor = super(SQLUpdateCompiler, self).execute_sql(result_type)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 840, in execute_sql
cursor.execute(sql, params)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 79, in execute
return super(CursorDebugWrapper, self).execute(sql, params)
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 59, in execute
self.db.validate_no_broken_transaction()
File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/base/base.py", line 327, in validate_no_broken_transaction
"An error occurred in the current transaction. You can't "
TransactionManagementError: An error occurred in the current transaction. You can't execute queries until the end of the 'atomic' block.
As from the traceback, the error is being pointed out in the following code:
def perform_create(self, serializer):
is_survey = self.request.data.get('is_survey', False)
fxf = serializer.save(is_survey=is_survey, is_deployed=True)
if not fxf.project:
fxf.from_project = False
fxf.save() #<<--here
if fxf.project:
if not fxf.is_survey:
org = fxf.project.organization
fxf.logs.create(source=self.request.user, type=18, title="General",
organization=org,
project = fxf.project,
content_object=fxf,
extra_object=fxf.project,
description='{0} assigned new General form {1} to {2} '.format(
self.request.user.get_full_name(),
fxf.xf.title,
fxf.project.name))
else:
org = fxf.site.project.organization
fxf.logs.create(source=self.request.user, type=19, title="General",
organization=org,
project=fxf.site.project,
site = fxf.site,
content_object=fxf,
extra_object=fxf.site,
description='{0} assigned new General form {1} to {2} '.format(
self.request.user.get_full_name(),
fxf.xf.title,
fxf.site.name
))
perform_create
is a method inside a viewset which is used when a form is assigned to a project.
From the error It's look like you have atomic transaction enabled either on Database level or at the view Level.
After reading your code, The issue looks something like this to me.
You created a new FieldSightXF object from the perform_create view,But as the atomic transactions are enabled the the object is not actually stored in database yet It will be stored when the whole piece of code will be executed and response is returned to user.
Now when you called the fxf.save(), the post_save_signal is processed and the function share_form is called.
@receiver(post_save, sender=FieldSightXF)
def share_form(sender, instance, created, **kwargs):
if instance.project is not None and created:
from onadata.apps.fsforms.tasks import share_form_managers
task_obj = CeleryTaskProgress.objects.create(user=instance.xf.user,
description="Share Forms",
task_type=17, content_object=instance)
if task_obj:
try:
share_form_managers.delay(instance.id, task_obj.id)
except:
pass
post_save.connect(create_messages, sender=FieldSightXF)
In share_form function you called the task share_form_managers with the id of the FieldSightXF object.
Now when celery executes your task and in doing so hits database to look for FieldSightXF object with id=fxf and for CeleryTaskProgress object with id=task_id, It doesn't find it because It's not there in the database yet and raises a DB error.
@shared_task(max_retires=5)
def share_form_managers(fxf, task_id):
fxf = FieldSightXF.objects.get(pk=fxf) #<<--here
userrole = UserRole.objects.filter(project=fxf.project, group__name='Project Manager')
users = User.objects.filter(user_roles__in=userrole)
shared = share_form(users, fxf.xf)
if shared:
CeleryTaskProgress.objects.filter(id=task_id).update(status=2)
else:
CeleryTaskProgress.objects.filter(id=task_id).update(status=3)
If a database error occurs within a transaction block, django prevents any further database queries to stop corrupted data read/write.
In the perform_create function, you are trying to read/write data after the fxf.save() , so django is raising transaction Error.
def perform_create(self, serializer):
is_survey = self.request.data.get('is_survey', False)
fxf = serializer.save(is_survey=is_survey, is_deployed=True)
if not fxf.project:
fxf.from_project = False
fxf.save() #<<--here
if fxf.project:
if not fxf.is_survey:
org = fxf.project.organization
fxf.logs.create(source=self.request.user, type=18, title="General",
organization=org,
project = fxf.project,
content_object=fxf,
extra_object=fxf.project,
description='{0} assigned new General form {1} to {2} '.format(
self.request.user.get_full_name(),
fxf.xf.title,
fxf.project.name))
else:
org = fxf.site.project.organization
fxf.logs.create(source=self.request.user, type=19, title="General",
organization=org,
project=fxf.site.project,
site = fxf.site,
content_object=fxf,
extra_object=fxf.site,
description='{0} assigned new General form {1} to {2} '.format(
self.request.user.get_full_name(),
fxf.xf.title,
fxf.site.name
))
What I would suggest is instead of using a post_save signal, call the share_form_managers task from perform_create view like this:
from django.db import transaction
def perform_create(self, serializer):
is_survey = self.request.data.get('is_survey', False)
fxf = serializer.save(is_survey=is_survey, is_deployed=True)
if not fxf.project:
fxf.from_project = False
fxf.save()
if fxf.project:
# calling the task
from onadata.apps.fsforms.tasks import share_form_managers
task_obj = CeleryTaskProgress.objects.create(user=fxf.xf.user, description="Share Forms", task_type=17, content_object=instance)
if task_obj:
try:
transaction.on_commit(lambda: share_form_managers.delay(instance.id, task_obj.id))
except:
pass
# call block end
if not fxf.is_survey:
org = fxf.project.organization
fxf.logs.create(source=self.request.user, type=18, title="General", organization=org, project = fxf.project, content_object=fxf,extra_object=fxf.project, description='{0} assigned new General form {1} to {2} '.format(self.request.user.get_full_name(),fxf.xf.title,fxf.project.name))
else:
org = fxf.site.project.organization
fxf.logs.create(source=self.request.user, type=19, title="General", organization=org, project=fxf.site.project, site = fxf.site, content_object=fxf, extra_object=fxf.site, description='{0} assigned new General form {1} to {2} '.format(self.request.user.get_full_name(),fxf.xf.title,fxf.site.name ))
Let me know if it helps.
Here is a reference to django's transaction management: https://docs.djangoproject.com/en/2.2/topics/db/transactions/