Search code examples
pythoncelerydjango-celerycelery-task

How to parallelize subtasks in celery


I have the following code. This is working fine. But I am iterating metrics in a for loop in get_host_types(). I want to create subtasks from get_host_types() function for each metric which will call celery tasks get_host_type(). This will allow the subtasks to run independently on worker nodes. I want to wait for the results in the method get_host_types() and return the results. I was thinking to use group(). But I can not call .get() on AsyncResult(). If I don't parallelize, I am not utilizing the distributed task framework to speed up the main task.

from __future__ import print_function


from celery import Celery, group
import requests

app = Celery('celery_test')

app.config_from_object('config')

@app.task
def get_host_type(metric, alert):
    host_types = get_host_types_for_alert(alert['alert_id'], metric)
    return host_types

class MyObject(dict):
    def __init__(self, alert, host_types):
        dict.__init__(self, alert=alert, host_types=host_types)


@app.task(serializer='json')
def get_host_types(my_obj):
    print(f"alert get_host_types ============> {my_obj}")
    alert = my_obj['alert']['alert']
    metrics = my_obj['host_types']
    ret_val = set()
    for m in metrics:
        res = alert_id_host_type_mapper.get_host_types_for_alert(alert['id'], m)
        ret_val.update(res)
    print(f"Return value ======> {ret_val}")
    return list(ret_val)


@app.task
def get_metrics(alert):
    print(f" alert ==> {alert}")
    #alert = alert[0]
    metric = alerts_client.get_metrics(alert['alert'])
    metrics = alert_id_host_type_mapper.metric_parser(metric)
    return MyObject(alert, metrics)


@app.task
def get_alert(alert_id):
    print(f" =====> alert id {alert_id}")
    return alerts_client.get_alerts(alert_id)


if __name__ == "__main__":
    res = (get_alert.s(267483) | get_metrics.s() | get_host_types.s()).apply_async()
    print(res.get())

Edit: If I do result.get() in a subtask I get the following error.

[2022-02-10 19:36:03,904: WARNING/ForkPoolWorker-42] Exception in thread Thread-7:
Traceback (most recent call last):
  File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner
    self.run()
  File "/home/sshil/code/statsboard/statsboard/celery_test/celery_test.py", line 30, in run
    result_int = res.get()
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 680, in get
    on_interval=on_interval,
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 793, in join_native
    assert_will_not_block()
  File "/home/sshil/venv/sb_venv/lib/python3.6/site-packages/celery/result.py", line 37, in assert_will_not_block
    raise RuntimeError(E_WOULDBLOCK)
RuntimeError: Never call result.get() within a task!
See http://docs.celeryq.org/en/latest/userguide/tasks.html#task-synchronous-subtasks

Solution

  • You can use Celery Group like below

    @app.task(serializer='json')
    def get_host_types(my_obj):
        print(f"alert get_host_types ============> {my_obj}")
        alert = my_obj['alert']['alert']
        metrics = my_obj['host_types']
        ret_val = []
        tasks = []
        for m in metrics:
            tasks.append(get_host_type.s(m,alert['id']))
        # create a group with all the tasks
        job = group(tasks)
        result = job.apply_async()
        ret_val = result.get(disable_sync_subtasks=False)
        return ret_val
    

    For more information on Celery Group refer -> http://ask.github.io/celery/userguide/groups.html#groups