My computations with dask.distributed include creation of intermediate files whose names include UUID4, that identify that chunk of work.
pairs = '{}\n{}\n{}\n{}'.format(list1, list2, list3, ...)
file_path = os.path.join(job_output_root, 'pairs',
'pairs-{}.txt'.format(str(uuid.uuid4()).replace('-', '')))
file(file_path, 'wt').writelines(pairs)
In the same time, all tasks in the dask distributed cluster have unique keys. Therefore, it would be natural to use that key ID for file name.
Is it possible?
There are two ways to approach the problem:
Functions like .submit
accept a key=
keyword argument where you can specify the key that you want used
>>> e.submit(inc, 1, key='inc-12345')
<Future: status: pending, key: inc-12345>
Similarly dask.delayed functions support a dask_key_name
keyword argument
>>> value = delayed(inc)(1, dask_key_name='inc-12345')
The scheduler places contextual information like this into a per-thread global during the execution of each task. As of Version 1.13 this is available as follows:
def your_function(...):
from distributed.worker import thread_state
key = thread_state.key
future = e.submit(your_function, ...)