I have a 100K by 12 by 100K matrix that I need to populate with computation results. I tried creating it using numpy.empty but got a memory error.
So I turned to dask instead. I'm able to create the dask array. I'm running a function that creates a vector as I traverse through the 0th and 1st dimension in a for loop. I then populate this vector into the i,jth position of the matrix. If I just populate the dask array as is, just the assignment step takes 50 milliseconds, which is way too long when extrapolated for all atomic cells in the matrix.
It seems it should be possible to speed up the assignment with dask's delayed function, but can't figure it out.
Here's how this would look without delay:
import dask.array as da
import dask.delayed as delayed
from dask import compute
import numpy as np
test_arr = da.empty(shape=(10000, 12, 10000), dtype='float32')
for i in range(test_arr.shape[0]):
for j in range(test_arr.shape[1]):
vals = np.random.normal(size=test_arr.shape[2])
test_arr[i,j,:] = vals
And here is my attempt at using delay:
def populate_array(i, j, vec):
test_arr[i, j, :] = vec
return test_arr
for i in range(test_arr.shape[0]):
for j in range(test_arr.shape[1]):
vals = np.random.normal(size=test_arr.shape[2])
delayed(populate_array)(i, j, vals)
compute(test_arr)
The latter doesn't error but just seems to return an array with all zeroes.
I know that I can also speed this up by getting rid of the for loop and vectorizing but assume that is currently not feasible.
I'm not tied to dask per se but it seems like a practical approach with a familiar syntax if coming from pandas / numpy.
Update: Accepted answer works but the task stream has a lot of blank spaces. I bring this up because my actual use case with a complex create_array_chunk formula just hangs. Cannot see the dashboard or what's going on.
This is how I'd do it. You don't fill an existing Dask Array, you build it chunk by chunk:
import dask.array as da
import dask.delayed as delayed
import numpy as np
shape = (10000, 12, 10000)
def create_array_chunk(i, j, k):
# should use i and j information probably here
return np.random.normal(size=k)
i_arrays = []
for i in range(shape[0]):
j_arrays = []
for j in range(shape[1]):
darray = da.from_delayed(delayed(create_array_chunk)(i, j, shape[2]), dtype=np.float64, shape=(shape[2],))
j_arrays.append(darray)
j_stack=da.stack(j_arrays, axis=0)
i_arrays.append(j_stack)
j_stack = da.stack(i_arrays, axis=0)
j_stack is a Dask Array structure of shape (10000, 12, 10000), has can be seen below:
It's a lazy structure, nothing has been computed yet. Be careful, if you call compute on it it will convert it to a Numpy array and take a lot of memory. You might want to stream it to disk using to_zarr
or equivalent.