Search code examples
pythondjangodjango-celerydjango-signals

Django: Post-Save Signal TransactionManagementError


I am using post-save signal for a model to start a celery task. It was working before but suddenly, it is giving me a TransactionManagementError.

Model

class FieldSightXF(models.Model):
    xf = models.ForeignKey(XForm, related_name="field_sight_form")
    site = models.ForeignKey(Site, related_name="site_forms", null=True, blank=True)
    project = models.ForeignKey(Project, related_name="project_forms", null=True, blank=True)
    is_staged = models.BooleanField(default=False)
    is_scheduled = models.BooleanField(default=False)
    date_created = models.DateTimeField(auto_now=True)
    date_modified = models.DateTimeField(auto_now=True)
    schedule = models.OneToOneField(Schedule, blank=True, null=True, related_name="schedule_forms")
    stage = models.OneToOneField(Stage, blank=True, null=True, related_name="stage_forms")
    shared_level = models.IntegerField(default=2, choices=SHARED_LEVEL)
    form_status = models.IntegerField(default=0, choices=FORM_STATUS)
    fsform = models.ForeignKey('self', blank=True, null=True, related_name="parent")
    is_deployed = models.BooleanField(default=False)
    is_deleted = models.BooleanField(default=False)
    is_survey = models.BooleanField(default=False)
    from_project = models.BooleanField(default=True)
    default_submission_status = models.IntegerField(default=0, choices=FORM_STATUS)
    logs = GenericRelation('eventlog.FieldSightLog')

    class Meta:
        db_table = 'fieldsight_forms_data'
        # unique_together = (("xf", "site"), ("xf", "is_staged", "stage"),("xf", "is_scheduled", "schedule"))
        verbose_name = _("XForm")
        verbose_name_plural = _("XForms")
        ordering = ("-date_created",)

Post-save signal

@receiver(post_save, sender=FieldSightXF)
def share_form(sender, instance, created,  **kwargs):
    if instance.project is not None and created:
        from onadata.apps.fsforms.tasks import share_form_managers
        task_obj = CeleryTaskProgress.objects.create(user=instance.xf.user,
                                                     description="Share Forms",
                                                     task_type=17, content_object=instance)
        if task_obj:
            try:
                share_form_managers.delay(instance.id, task_obj.id)
            except:
                pass

post_save.connect(create_messages, sender=FieldSightXF)

The CeleryTaskProgress is used to track the progress of the celery tasks.

Task

@shared_task(max_retires=5)
def share_form_managers(fxf, task_id):
    fxf = FieldSightXF.objects.get(pk=fxf)
    userrole = UserRole.objects.filter(project=fxf.project, group__name='Project Manager')
    users = User.objects.filter(user_roles__in=userrole)
    shared = share_form(users, fxf.xf)
    if shared:
        CeleryTaskProgress.objects.filter(id=task_id).update(status=2)
    else:
        CeleryTaskProgress.objects.filter(id=task_id).update(status=3)

share_form method

def share_form(users, xform):
    from onadata.apps.fsforms.models import ObjectPermission, Asset
    for user in users:
        try:
            codenames = ['view_asset', 'change_asset']
            permissions = Permission.objects.filter(content_type__app_label='kpi', codename__in=codenames)
            for perm in permissions:
                object_id = Asset.objects.get(uid=xform.id_string).id
                content_type = ContentType.objects.get(id=21)

                # Create the new permission
                new_permission = ObjectPermission.objects.create(
                    object_id=object_id,
                    content_type=content_type,
                    user=user,
                    permission_id=perm.pk,
                    deny=False,
                    inherited=False
                )

        except:
            return False
        else:
            return True

What this process does is that whenever an object of FieldSightXF is created(A form is assigned to a project), then the form is shared to the project managers of that project.

Previously there was no problem as I was passing the FieldSightXF object as a parameter to the task, but now I am passing the object id as:

Previously

share_form_managers.delay(instance, task_obj.id)

Current

share_form_managers.delay(instance.id, task_obj.id) 

Now, both the cases gives me the mentioned error. If I comment the above line from the post-save signal method, the error is gone.

I did try the suggestions from other answers that were given to similar problem but they did not work for me.

Full Error Traceback:

ERROR 2019-06-13 10:49:50,743 base 11527 140653367232256 Internal Server Error: /forms/api/fxf/
Traceback (most recent call last):
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/core/handlers/base.py", line 132, in get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/viewsets.py", line 87, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 466, in dispatch
    response = self.handle_exception(exc)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 463, in dispatch
    response = handler(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/mixins.py", line 21, in create
    self.perform_create(serializer)
  File "/home/sanip/naxa/source/fieldsight/onadata/apps/fsforms/viewsets/FieldSightXformViewset.py", line 85, in perform_create
    fxf.save()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 734, in save
    force_update=force_update, update_fields=update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 762, in save_base
    updated = self._save_table(raw, cls, force_insert, force_update, using, update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 827, in _save_table
    forced_update)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 877, in _do_update
    return filtered._update(values) > 0
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/query.py", line 580, in _update
    return query.get_compiler(self.db).execute_sql(CURSOR)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 1062, in execute_sql
    cursor = super(SQLUpdateCompiler, self).execute_sql(result_type)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 840, in execute_sql
    cursor.execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 79, in execute
    return super(CursorDebugWrapper, self).execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 59, in execute
    self.db.validate_no_broken_transaction()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/base/base.py", line 327, in validate_no_broken_transaction
    "An error occurred in the current transaction. You can't "
TransactionManagementError: An error occurred in the current transaction. You can't execute queries until the end of the 'atomic' block.
Internal Server Error: /forms/api/fxf/
Traceback (most recent call last):
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/core/handlers/base.py", line 132, in get_response
    response = wrapped_callback(request, *callback_args, **callback_kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/views/decorators/csrf.py", line 58, in wrapped_view
    return view_func(*args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/viewsets.py", line 87, in view
    return self.dispatch(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 466, in dispatch
    response = self.handle_exception(exc)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/views.py", line 463, in dispatch
    response = handler(request, *args, **kwargs)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/rest_framework/mixins.py", line 21, in create
    self.perform_create(serializer)
  File "/home/sanip/naxa/source/fieldsight/onadata/apps/fsforms/viewsets/FieldSightXformViewset.py", line 85, in perform_create
    fxf.save()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 734, in save
    force_update=force_update, update_fields=update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 762, in save_base
    updated = self._save_table(raw, cls, force_insert, force_update, using, update_fields)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 827, in _save_table
    forced_update)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/base.py", line 877, in _do_update
    return filtered._update(values) > 0
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/query.py", line 580, in _update
    return query.get_compiler(self.db).execute_sql(CURSOR)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 1062, in execute_sql
    cursor = super(SQLUpdateCompiler, self).execute_sql(result_type)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/models/sql/compiler.py", line 840, in execute_sql
    cursor.execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 79, in execute
    return super(CursorDebugWrapper, self).execute(sql, params)
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/utils.py", line 59, in execute
    self.db.validate_no_broken_transaction()
  File "/home/sanip/.virtualenvs/kobocat/lib/python2.7/site-packages/django/db/backends/base/base.py", line 327, in validate_no_broken_transaction
    "An error occurred in the current transaction. You can't "
TransactionManagementError: An error occurred in the current transaction. You can't execute queries until the end of the 'atomic' block.

As from the traceback, the error is being pointed out in the following code:

def perform_create(self, serializer):
    is_survey = self.request.data.get('is_survey', False)
    fxf = serializer.save(is_survey=is_survey, is_deployed=True)
    if not fxf.project:
        fxf.from_project = False
    fxf.save() #<<--here
    if fxf.project:
        if not fxf.is_survey:    
            org = fxf.project.organization
            fxf.logs.create(source=self.request.user, type=18, title="General",
                      organization=org,
                      project = fxf.project,
                      content_object=fxf,
                      extra_object=fxf.project,
                      description='{0} assigned new General form  {1} to {2} '.format(
                          self.request.user.get_full_name(),
                          fxf.xf.title,
                          fxf.project.name))
    else:
        org = fxf.site.project.organization

        fxf.logs.create(source=self.request.user, type=19, title="General",
                                          organization=org,
                                          project=fxf.site.project,
                                          site = fxf.site,
                                          content_object=fxf,
                                          extra_object=fxf.site,
                                          description='{0} assigned new General form  {1} to {2} '.format(
                                              self.request.user.get_full_name(),
                                              fxf.xf.title,
                                              fxf.site.name
                                          ))

perform_create is a method inside a viewset which is used when a form is assigned to a project.


Solution

  • From the error It's look like you have atomic transaction enabled either on Database level or at the view Level.

    After reading your code, The issue looks something like this to me.

    • You created a new FieldSightXF object from the perform_create view,But as the atomic transactions are enabled the the object is not actually stored in database yet It will be stored when the whole piece of code will be executed and response is returned to user.

    • Now when you called the fxf.save(), the post_save_signal is processed and the function share_form is called.

      @receiver(post_save, sender=FieldSightXF)
      def share_form(sender, instance, created,  **kwargs):
          if instance.project is not None and created:
              from onadata.apps.fsforms.tasks import share_form_managers
              task_obj = CeleryTaskProgress.objects.create(user=instance.xf.user,
                                                   description="Share Forms",
                                                   task_type=17, content_object=instance)
              if task_obj:
                  try:
                      share_form_managers.delay(instance.id, task_obj.id)
                  except:
                      pass
      
      post_save.connect(create_messages, sender=FieldSightXF)
      
    • In share_form function you called the task share_form_managers with the id of the FieldSightXF object.

    • Now when celery executes your task and in doing so hits database to look for FieldSightXF object with id=fxf and for CeleryTaskProgress object with id=task_id, It doesn't find it because It's not there in the database yet and raises a DB error.

      @shared_task(max_retires=5)
      def share_form_managers(fxf, task_id):
          fxf = FieldSightXF.objects.get(pk=fxf) #<<--here
          userrole = UserRole.objects.filter(project=fxf.project, group__name='Project Manager')
          users = User.objects.filter(user_roles__in=userrole)
          shared = share_form(users, fxf.xf)
          if shared:
              CeleryTaskProgress.objects.filter(id=task_id).update(status=2)
          else:
              CeleryTaskProgress.objects.filter(id=task_id).update(status=3)
      
    • If a database error occurs within a transaction block, django prevents any further database queries to stop corrupted data read/write.

    • In the perform_create function, you are trying to read/write data after the fxf.save() , so django is raising transaction Error.

      def perform_create(self, serializer):
          is_survey = self.request.data.get('is_survey', False)
          fxf = serializer.save(is_survey=is_survey, is_deployed=True)
          if not fxf.project:
              fxf.from_project = False
          fxf.save() #<<--here
          if fxf.project:
              if not fxf.is_survey:    
                  org = fxf.project.organization
                  fxf.logs.create(source=self.request.user, type=18, title="General",
                    organization=org,
                    project = fxf.project,
                    content_object=fxf,
                    extra_object=fxf.project,
                    description='{0} assigned new General form  {1} to {2} '.format(
                        self.request.user.get_full_name(),
                        fxf.xf.title,
                        fxf.project.name))
          else:
              org = fxf.site.project.organization
      
              fxf.logs.create(source=self.request.user, type=19, title="General",
                                        organization=org,
                                        project=fxf.site.project,
                                        site = fxf.site,
                                        content_object=fxf,
                                        extra_object=fxf.site,
                                        description='{0} assigned new General form  {1} to {2} '.format(
                                            self.request.user.get_full_name(),
                                            fxf.xf.title,
                                            fxf.site.name
                                        ))
      

    What I would suggest is instead of using a post_save signal, call the share_form_managers task from perform_create view like this:

        from django.db import transaction
    
        def perform_create(self, serializer):
            is_survey = self.request.data.get('is_survey', False)
            fxf = serializer.save(is_survey=is_survey, is_deployed=True)
            if not fxf.project:
                fxf.from_project = False
            fxf.save()
            if fxf.project:
                # calling the task
                from onadata.apps.fsforms.tasks import share_form_managers
                task_obj = CeleryTaskProgress.objects.create(user=fxf.xf.user, description="Share Forms", task_type=17, content_object=instance)
                if task_obj:
                    try:
                        transaction.on_commit(lambda: share_form_managers.delay(instance.id, task_obj.id))
                    except:
                        pass
                # call block end
                if not fxf.is_survey:    
                    org = fxf.project.organization
                    fxf.logs.create(source=self.request.user, type=18, title="General", organization=org, project = fxf.project, content_object=fxf,extra_object=fxf.project, description='{0} assigned new General form  {1} to {2} '.format(self.request.user.get_full_name(),fxf.xf.title,fxf.project.name))
            else:
                org = fxf.site.project.organization
    
                fxf.logs.create(source=self.request.user, type=19, title="General", organization=org, project=fxf.site.project, site = fxf.site, content_object=fxf, extra_object=fxf.site, description='{0} assigned new General form  {1} to {2} '.format(self.request.user.get_full_name(),fxf.xf.title,fxf.site.name ))
    
    • the transaction.on_commit will ensure the task will be called only after the data from perform_create view is stored in database.

    Let me know if it helps.

    Here is a reference to django's transaction management: https://docs.djangoproject.com/en/2.2/topics/db/transactions/