Note, the entire code for the question below is public on Github. Feel free to check out the project! https://github.com/b-long/moose-dj-uv/pull/3
I'm trying to workout a simple Django + Dask integration, where one view starts a long-running process and another view is able to check the status of that work. Later on, I might enhance this in a way that get_task_status
(or some other Django view function) is able to return the output of the work.
I'm using time.sleep(2)
to intentionally mimic a long-running bit of work. Also, it's important to see the overall work status as "running"
. To that end, I'm also using a time.sleep()
in my test, which feels very silly.
Here's the view code:
from uuid import uuid4
from django.http import JsonResponse
from dask.distributed import Client
import time
# Initialize Dask client
client = Client(n_workers=8, threads_per_worker=2)
NUM_FAKE_TASKS = 25
# Dictionary to store futures with task_id as key
task_futures = {}
def long_running_process(work_list):
def task_function(task):
time.sleep(2)
return task
futures = [client.submit(task_function, task) for task in work_list]
return futures
def start_task(request):
work_list = []
for t in range(NUM_FAKE_TASKS):
task_id = str(uuid4()) # Generate a unique ID for the task
work_list.append(
{"address": f"foo--{t}@example.com", "message": f"Mail task: {task_id}"}
)
futures = long_running_process(work_list)
dask_task_id = futures[0].key # Use the key of the first future as the task ID
# Store the futures in the dictionary with task_id as key
task_futures[dask_task_id] = futures
return JsonResponse({"task_id": dask_task_id})
def get_task_status(request, task_id):
futures = task_futures.get(task_id)
if futures:
if not all(future.done() for future in futures):
progress = 0
return JsonResponse({"status": "running", "progress": progress})
else:
results = client.gather(futures, asynchronous=False)
# Calculate progress, based on futures that are 'done'
progress = int((sum(future.done() for future in futures) / len(futures)) * 100)
return JsonResponse(
{
"task_id": task_id,
"status": "completed",
"progress": progress,
"results": results,
}
)
else:
return JsonResponse({"status": "error", "message": "Task not found"})
I've written a test, which completes in about 5.5 seconds:
from django.test import Client
from django.urls import reverse
import time
def test_immediate_response_with_dask():
client = Client()
response = client.post(reverse("start_task_dask"), data={"data": "foo"})
assert response.status_code == 200
assert "task_id" in response.json()
task_id = response.json()["task_id"]
response2 = client.get(reverse("get_task_status_dask", kwargs={"task_id": task_id}))
assert response2.status_code == 200
r2_status = response2.json()["status"]
assert r2_status == "running"
attempts = 0
max_attempts = 8
while attempts < max_attempts:
time.sleep(1)
try:
response3 = client.get(
reverse("get_task_status_dask", kwargs={"task_id": task_id})
)
assert response3.status_code == 200
r3_status = response3.json()["status"]
r3_progress = response3.json()["progress"]
assert r3_progress >= 99
assert r3_status == "completed"
break # Exit the loop if successful
except Exception:
attempts += 1
if attempts == max_attempts:
raise # Raise the last exception if all attempts failed
My question is, is there a more performant way to implement this same API? What if NUM_FAKE_TASKS = 10000
?
Am I wasting cycles?
Thanks to @GuillaumeEB for the tip.
So, we know that the following is blocking:
client.gather(futures, asynchronous=False)
But, it seems like this also doesn't behave the way expect:
client.gather(futures, asynchronous=True)
Is there some way that I could use client.persist()
or client.compute()
, to see incremental progress?
I know that I can't persist a list
of <class 'distributed.client.Future'>
, and using client.compute(futures)
also seems to behave incorrectly (jumping the progress from 0
to 100
).
I think the solution you are looking for is as_completed: https://docs.dask.org/en/latest/futures.html#waiting-on-futures.
You can also iterate over the futures as they complete using the as_completed function