I have a set of long running tasks I need to coordinate in group. The challenge is the tasks must be fired as soon as they can be created, not after all tasks are defined (as shown below).
The problem is group(...)
does not accept AsyncTask
and calling apply_async
on the task results in every task running twice.
import time
from celery import group, chain, chord, subtask
# Run it
pipeline = rate_limited_task.s(end_task.s())
job_id_to_track = pipeline.apply_async()
# TASKS
@app.task(bind=true)
def rate_limited_task(self, downstream_task):
tasks = []
for random_number in np.random.randint(1, 10, 1000):
# ... imagine it takes several seconds to generate a random number. This will loop many times
task = parallel_tasks.si(random_number) # This needs to fire immediately
tasks.append(task)
pipeline = group(tasks) | downstream_task
return pipeline()
@app.task(bind=true)
def parallel_tasks(self, data):
# Another long running task
print(f'sleeping for {data} seconds')
time.sleep(data)
return data
@app.task(bind=true)
def end_task(self, results):
print('End task')
print(results)
Question: Is it possible to create a group of tasks that are already running (or in any state)?
Current solution (not ideal)
from celery.result import allow_join_result, GroupResult
@app.task(bind=true)
def rate_limited_task(self, downstream_task):
tasks = []
for random_number in np.random.randint(1, 10, 1000):
# ... imagine it takes several seconds to generate a random number. This will loop many times
task = parelel_tasks.apply_async([random_number])
tasks.append(task)
gid = uuid()
result_set = GroupResult(id=gid, results=tasks)
result_set.save(backend=self.backend)
chain(
group_tasks.s(gid),
downstream_task
)()
...
@app.task(bind=true)
def group_tasks(self, group_task_id):
group_task = GroupResult.restore(group_task_id, app=app, backend=self.backend)
# Less than ideal solution.
# Needs logic to handle any failed tasks (solved in the group class)
# Tasks should have an expiration time so that they are not retried forever
if not group_task.ready():
raise self.retry(countdown=1)
with allow_join_result():
results = group_task.join()
return results
You cannot create a group of already existing tasks because Celery's groups are collections of lazy_evaluated signatures, not tasks. (The group actually executes the signatures, so the idea of "adding" tasks is a non-starter).
If you really want to do this, you can reach for celery's vine.synchronization.barrier
implementation which supports adding tasks and waiting on everything being done before moving on. That being said, it is as you mentioned where you would have to implement almost all of apply_async
and __apply_async
from celery.group
class to make it viable.
So, tl;dr, probably not worth it because...
You can pass in an generator into a group and it will start as soon as values are passed from the generator to the group.
Here is the PoC that shows that this works:
from celery import Celery, group
import time, os, random
app = Celery(
'tasks',
broker=os.environ.get("BROKER"),
backend=os.environ.get('BACKEND'),
enable_utc=True,
)
@app.task(bind=True)
def end_task(self, results):
print(f'{self.request.id}: end task')
print(results)
return results
@app.task(bind=True)
def parallel_task(self,x):
time.sleep(60/x)
print(f'sleeping for {60/x}s')
return x + 5
def slow_random(high):
time.sleep(15)
value = random.randint(1,high)
return value
@app.task(bind=True)
def rate_limited_task(self, downstream_task):
g = group(parallel_task.si(slow_random(10)) for _ in range(10))
pipeline = g | downstream_task
return pipeline.delay().id
pipeline = rate_limited_task.s(end_task.s())
job_id_to_track = pipeline.delay().get()
print(job_id_to_track)