I am running dask distributed and would like intermediate values to be saved. For example, when I run this:
from distributed import Client
client = Client(IP_PORT_TO_SCHEDULER)
from dask import delayed
@delayed(pure=True)
def myfunction(a):
print("recomputing")
return a + 3
res = myfunction(1)
res2 = res**2
res3 = client.persist(res2)
resagain = res**3
resagain2 = client.persist(resagain)
I would expect "recompute" to print only once. However, in this case it prints twice. I think this might be because the client doesn't cache this intermediate value. For example, running client.has_what()
, I see this:
{'tcp://xx.xx.xx.xx:xxxx': ['pow-9d66a68ce8be79ff9cca17a2dc58aa0b',
'pow-440784f1abedb14511aa0d633935b55a']}
I see the final result of the power functions but not the intermediate computation. Is there a way to force the client to store this intermediate computation? thanks!
Dask will hold on to all results that you have explicitly persisted. Any intermediate results will be cleaned up in order to save on memory.
So in your case you probably want to do the following:
res = myfunction(1)
res = res.persist() # Ask Dask to keep this in memory
...