Search code examples
workerfile-sharingdask

DASK with local files on WORKER systems


I am working with mutiple systems as workers. Each worker system has a part of the data locally stored. And I want the computation done by each worker on its respective file only.

I have tried using :

distributed.scheduler.decide_worker()

send_task_to_worker(worker, key)

but I could not automate assigning the task for each file.

Also, is there anyway I can access local files of the worker? Using tcp address, I only have access to a temp folder of the worker created for dask.


Solution

  • You can target computations to run on certain workers using the workers= keyword to the various methods on the client. See http://distributed.readthedocs.io/en/latest/locality.html#user-control for more information.

    You might run a function on each of your workers that tells you which files are present:

    >>> client.run(os.listdir, my_directory)
    {'192.168.0.1:52523': ['myfile1.dat', 'myfile2.dat'],
     '192.168.0.2:4244': ['myfile3.dat'],
     '192.168.0.3:5515': ['myfile4.dat', 'myfile5.dat']}
    

    You might then submit computations to run on those workers specifically.

    future = client.submit(load, 'myfile1.dat', workers='192.168.0.1:52523')
    

    If you are using dask.delayed you can also pass workers= to the `persist method. See http://distributed.readthedocs.io/en/latest/locality.html#user-control for more information