I am new to Celery, and I would like advice on how best to use Celery to accomplish the following.
Suppose I have ten large datasets. I realize that I can use Celery to do work on each dataset by submitting ten tasks. But suppose that each dataset consists of 1,000,000+ text documents stored in a NoSQL database (Elasticsearch in my case). The work is performed at the document level. The work could be anything - maybe counting words.
For a given dataset, I need to start the dataset-level task. The task should read documents from the data store. Then workers should process the documents - a document-level task.
How can I do this, given that the task is defined at the dataset level, not the document level? I am trying to move away from using a JoinableQueue
to store documents and submit them for work with multiprocessing
.
It have read that it is possible to use multiple queues in Celery, but it is not clear to me that that is the best approach.
Lets see if this helps. You can define a workflow and add tasks to it and then run the whole thing after building up your tasks. You can have normal python methods return tasks to can be added into celery primatives (chain, group chord etc) See here for more info. For example lets say you have two tasks that process documents for a given dataset:
def some_task():
return dummy_task.si()
def some_other_task():
return dummy_task.si()
@celery.task()
def dummy_task(self, *args, **kwargs):
return True
You can then provide a task that generates the subtasks like so:
@celery.task()
def dataset_workflow():
datastets = get_datasets(*args, **kwargs)
workflows = []
for dataset in datasets:
documents = get_documents(dataset)
worflow = chain(some_task(documents), some_other_task(documents))
worlfows.append(workflow)
run_workflows = chain(*workflows).apply_aysnc()
Keep in mind that generating alot of tasks can consume alot of memory for the celery workers, so throttling or breaking the task generation up might be needed as you start to scale your workloads.
Additionally you can have the document level tasks on a diffrent queue then your worflow task if needed based on resource contstraints etc.