Search code examples
pythondjangooopcelery

How to avoid classmethod side effect using Celery?


I am running a class based app using celery, but I am noting that when two processes run simultaneously, certain staticmethods in the class are not acting independently. Here is the app invocation

import os
from PriceOptimization.celery import app
from .Tasks_Sim.sim import Sim, final_report


@app.task(name='Simulations.tasks.scoring')
def simulation(clients, deciles):
   s = Sim(**sim_params)

   market_by_year = s.control_flow(my_save_path)
   report = final_report(market_by_year)
   return report

Within my Sim app, I have a class method that creates id's for my instance as follows

class Company:
    company_id = 0

    @classmethod
    def set_company_no(cls):
        cls.company_id += 1
        return cls.company_id-1


    def __init__(self, companies, year):
        self._company_id = Company.set_company_no()
        self._company_year = year

Usually the first task instantiated will complete successfully, but on the next invocation, I am getting a list index out of range error that suggests to me that my workers are not independent and that my company_id object is not commencing from zero with the next invocation. How can I prevent this side effect and have each app run independently?


Solution

  • For now, I have elected to make my process run sequentially using a redis lock:

    from settings import REDIS_INSTANCE
    REDIS_LOCK_KEY = 'ABC'
    
    @app.task(name='Simulations.tasks.scoring')
    def simulation(clients, deciles):
    
        timeout = (60 * 5)
        have_lock = False
        my_lock = REDIS_INSTANCE.lock(REDIS_LOCK_KEY, timeout=timeout)
        while have_lock == False:
            have_lock = my_lock.acquire(blocking=False)
            if have_lock:
                print('unique process commencing...')
                s = Sim(**sim_params)
                market_by_year = s.control_flow(my_save_path)
                report = final_report(market_by_year)
            else:
                print('waiting for lock to commence...')
                time.sleep(10)
        my_lock.release()
        return report