Search code examples
pythondaskdask-distributed

Why is the dict not mutated in dask?


How does dask.delayed handle mutable inputs? explains it with delayed but what should I do if I need the input to be mutated?

I have been trying out Dask and I need to understand how dictionaries are mutated when they are called using distributed.Client.get vs sequentially (normal way).

Sequential

def foo(dictionary):
    dictionary['foo'] = 'foo was called'

def bar(dictionary):
    dictionary['bar'] = 'bar was called'

dictionary = {}

print(dictionary) # {}
foo(dictionary)
print(dictionary) # {'foo': 'foo was called'}
bar(dictionary)
print(dictionary) # {'foo': 'foo was called', 'bar': 'bar was called'}

This works the way I expect it to, the dictionary is mutated and I get two keys after the calls to foo and bar.

Dask

from dask.distributed import Client

client = Client(processes=False)

def foo(dictionary):
    dictionary['foo'] = 'foo was called'

def bar(dictionary):
    dictionary['bar'] = 'bar was called'

dictionary = {}

dsk = {'foo': (foo, dictionary), 'bar':(bar, dictionary)}

client.get(dsk, ['foo', 'bar'])

print(dictionary) # {}

Why is this returning an empty dict? Why is that not mutated? I noticed the dictionary dict has different id(dictionary) inside each functions, so I understand it is a copy.

Is it safe to assume that every function gets its own copy of the objects passed to it? So I can mutate them within the function and have the one at global untouched? Is this understanding correct?


Solution

  • The short answer: when you pack up a graph and send it to the scheduler, which then sends it to workers, the graph gets serialised and then unserialised. Essentially, it was written with the assumption that the scheduler and workers are in another process or machine. This creates new copies, so mutation has no effect on the original. I believe with pickle5, larger buffer-like objects (e.g., arrays) may be zero-copy.

    With the default threaded scheduler, the graph is simply handed to the scheduler and nothing gets copied. This is a far simpler mechanism and a far simpler implementation of the scheduler, but it still has its uses.

    To actually mutate objects in place, you would either need to use variables (not meant for large objects, they live on the scheduler), actors (a niche use case) or shared memory. In any case, it would break the normal dask "functional" assumption that the outcome of a task depends on its inputs, and you would need to be careful around cases where a task might be called twice.