Search code examples
djangodjango-modelscelerydjango-celerycelerybeat

How do i create seperate tasks for every objects with celery beat in Django models


Let's assume we have following model field:

class Project(models.Model):
    project_name = models.CharField(max_length=200,unique=True)
    project_scan = models.IntegerField()  ### Scan interval
    project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks

Assume We have 2 Project Objects:

1. Project(project_name='test1',project_scan=5) ### Scan `test1` every `5` hour
2. Project(project_name='test2',project_scan=10) ### Scan `test2` every `10` hour

Tasks.py

@task(name='project_tasks')
def Project_Tasks():
    get_all_projects = Project.objects.all()
    for each_project in get_all_project:
        if each_project.project_status == True: ### Checking if it "Scan" is allowed.
            get_interval = each_project.project_scan
            get_name = each_project.project_name
            print(get_name)

My Question : How do i run tasks on each object based on given project_scan Interval ? , Since Celery beat takes Tasks name as argument to perform scan like: PeriodicTask.objects.create(interval=given_interval, name='I dont know', task='project_tasks', )

How do i create separate instance for each project task ?

I Tried creating intervalSchedule field in models.py but didn't worked:

class Project(models.Model):
    project_name = models.CharField(max_length=200,unique=True)
    project_scan = models.IntegerField()  ### Scan interval
    project_status = models.BooleanField() ### To Enable "Scan" or Disable "Scan" Tasks
    schedule = IntervalSchedule()

Solution

  • You can add use signals:

    from django.db.models.signals import post_save
    from django.dispatch import receiver
    
    from django_celery_beat.models import PeriodicTask, IntervalSchedule
    
    
    class Project(models.Model):
        project_name = models.CharField(max_length=200,unique=True)
        project_scan = models.IntegerField()  
        project_status = models.BooleanField()
    
    
        def set_periodic_task(self, task_name):
            schedule = self.get_or_create_interval()
            PeriodicTask.objects.create(
                interval=schedule, 
                name=f'{self.project_name}-{self.id}', 
                task=task_name,
            )
    
        def get_or_create_interval(self):
            schedule, created = IntervalSchedule.objects.get_or_create(
                every=self.project_scan,
                period=IntervalSchedule.HOURS,
            )
            return schedule
    
        def get_periodic_task(self, task_name):
            interval = self.get_or_create_interval()
            periodic_task = PeriodicTask.objects.get(
                interval=interval, 
                name=f'{self.project_name}-{self.id}', 
                task=task_name,
            )
            return periodic_task
    
        def sync_disable_enable_task(self, task_name):
            periodic_task = self.get_periodic_task(task_name)
            periodic_task.enabled = self.project_status
            periodic_task.save()
    
    
    @receiver(post_save, sender=Project)
    def set_or_sync_periodic_task(sender, instance=None, created=False, **kwargs):
        if created:
            instance.set_periodic_task(task_name='project_tasks')
        else:
            instance.sync_disable_enable_task(task_name='project_tasks')
    

    What you have there: When you created a new Project instance a new Periodic task is saved with method set_periodic_task. If you want to disable or enable periodic task of your instance you just change project_status status and save it. It will trigger sync_disable_enable_task method for enabling or disabling.

    if you want to pass args you can do:

     PeriodicTask.objects.create(
         interval=schedule,
         name=f'{self.project_name}-{self.id}', 
         task='proj.tasks.import_contacts',
         args=json.dumps(['arg1', 'arg2']),
         kwargs=json.dumps({
            'some_kwarg': '123,
         }),
    )