I need to run a task "CreateNotifications" at random intervals of time. Here is what I was trying to do in the settings for CELERY.
t = random.randint(45, 85)
## print "time = ", t
## celery app configuration
app.conf.CELERYBEAT_SCHEDULE = {
# Executes at every 't' interval, where t is random
'create-notifications': {
'task': 'apps.notifications.tasks.CreateNotifications',
'schedule': timedelta(seconds=t),
},
}
Now the problem is that these settings for CELERY are executed only once (when I run the command python manage.py runserver), hence the variable 't' and hence the value of 'seconds' in timedelta gets a random value, but only once.
Ultimately making the above process a periodic one with a fixed period as X seconds, only X being selected at random when I start server.
Alternately I've tried running a single task and used an endless while loop in it with a random delay, so that celery autodetects only one task and that task never ends. My purpose is solved by random delay in the while loop. Like this ( NOTE -> 'while' is inside the function CreateNotifications() )
@app.task
def CreateNotifications():
while True:
upper_limit = models.MyUser.objects.all().aggregate(Max('id'))
lower_limit = models.MyUser.objects.all().aggregate(Min('id'))
## select a user to be notified randomly
to_user = None
to = 0
while to_user is None:
to = random.randint(lower_limit['id__min'], upper_limit['id__max'])
try:
to_user = models.MyUser.objects.get(id=to)
except:
pass
## select a user to be notified from randomly
frm_user = None
frm = to
while frm_user is None:
while frm == to:
frm = random.randint(lower_limit['id__min'], upper_limit['id__max'])
try:
frm_user = models.MyUser.objects.get(id=frm)
except:
pass
notif_type = ['comment on', 'liked', 'shared']
notif_media = ['post', 'picture', 'video']
models.Notification.objects.create(
notified_user = to_user,
notifier = frm_user,
notification_type = random.choice(notif_type),
notification_media = random.choice(notif_media))
to_user.new_notification_count += 1
to_user.save()
t = random.randint(35, 55)
print "delay = ", t
time.sleep(t)
Its doing things exactly as I want, but now there are 4 different workers executing the same task, but i want only one.
I've tried changes to celeryd file located in my virtualenv/bin/ directory as indicated here -> Celery. Decrease number of processes
as there is no celeryd file in /etc/defaults/ but still no success
Any help will be appreciated.
You really shouldn't put an endless loop in a Celery task. If you need a dedicated process it would be better to run it on it's own rather than through Celery.
You could reschedule a new task in the future whenever your task runs. Something like the following:
@app.task
def create_notifications():
try:
#
finally:
t = random.randint(45, 85)
create_notifications.apply_async(countdown=t)
Another option would be to have a scheduler task that runs on a regular schedule but queues the notification tasks for a random time in the near future. For example, if the scheduler task runs every 45 seconds.
@app.task
def schedule_notifications():
t = random.randint(0, 40)
create_notifications.apply_async(countdown=t)