Search code examples
pythonnumpydask

TypeError when running compute that includes map_blocks and reduce


I am having difficulty diagnosing the cause of the error. My code involves running a convolution (with map_blocks) over some arrays if they belong to the same group of variables, otherwise just record the 2-dim array. I then do an argmax operation and add the result to a list, that we then concatenate.

I tried running compute with scheduler='single-threaded' argument, to help debug, but I still wasn't able to see the cause of the error.

import dask.array as da
from functools import reduce
import numpy as np

size = 100000
vals = da.linspace(0, 1, size)
nvars = 12
test = da.random.uniform(low=0, high=1, size=(100000, nvars, size), chunks=(100, nvars, size))

# number of total unique items corresponds to nvars
var_lookup = {
        'a': [0, 1],
        'b':
        [0, 1],
        'c': [0],
        'd': [0, 1],
        'e': [0],
        'f': [0, 1, 2],
        'g': [0],
    }

# Iterates over all 0 dimension coordinates
# and convolves relevant values from x and y
def custom_convolve(x,y):
    temp_lst = []
    for i in range(x.shape[0]):
        a = da.fft.rfft(x[i])
        b = da.fft.rfft(y[i])
        conv_res = da.fft.irfft(a * b, n = size)
        temp_lst.append(conv_res)
    res = da.stack(temp_lst, axis=0)
    return res

n_groups = len(var_lookup.keys())

counter = 0
group_cols = []
for i in var_lookup.keys():
    grp = var_lookup[i]
    # if group consists of 1 value, then just record that 2-dim array
    if len(grp)==1:
        temp =  test[:,counter,:]
        counter += 1
    else:
        test_list = []
        for _ in var_lookup[i]:
            test_list.append(test[:, counter, :])
            counter += 1
        temp = reduce(lambda x, y: da.map_blocks(custom_convolve, x, y, dtype='float32'), test_list)

    res = vals[da.argmax(temp, axis=1)]

    group_cols.append(res)

loc = da.stack(group_cols, axis=1)

Error when running compute:

res = loc.compute()

Traceback for error from the last line is long, but the end is here

File c:\Users\x\lib\site-packages\dask\array\slicing.py:990, in check_index(axis, ind, dimension)
    987 elif ind is None:
    988     return
--> 990 elif ind >= dimension or ind < -dimension:
    991     raise IndexError(
    992         f"Index {ind} is out of bounds for axis {axis} with size {dimension}"
    993     )

TypeError: '>=' not supported between instances of 'str' and 'int'

Maybe the reduce function coupled with map_blocks is causing the problem?

Debug attempt update 1:

I used pdb, converted the code to a .py file, changed compute argument to scheduler='single-threaded'), added a set_trace to right after the for i line and stepped through. It only errors out when I get to the compute step with the same error, so not helpful.

Debug attempt update 2:

I've identified the exact line that gives the problem. I simplified the code a little to make sure that it wasn't the reduce function and got rid of the loops.

size = 10000
x_vals = da.linspace(0, 1, 1000)
test = da.random.uniform(low=0, high=1, size=(size,4,1000), chunks=(size / 10, 1, 1000))

def simple_convolve(x, y):
    temp_lst = []
    for i in range(x.shape[0]):
        a = da.fft.rfft(x[i])
        b = da.fft.rfft(y[i])
        conv_res = da.fft.irfft(a * b, n = size)
        temp_lst.append(conv_res)
    res = da.stack(temp_lst, axis=0)
    return res

res = da.map_blocks(simple_convolve, test[:,0], test[:,1], dtype='float32')

temp = x_vals[da.argmax(res, axis=1)]

We get an error here. If we drill in, then the error actually comes from running this

da.argmax(res, axis=1)

Since the error is saying I'm comparing a string and an integer, I checked that res has no nulls and no infinity values:

# btw don't understand why just 1 compute still returns a dask array
da.isnan(res).sum().compute().compute()
0

(~da.isfinite(res)).sum().compute().compute()
0

Solution

  • As answered in https://dask.discourse.group/t/typeerror-on-da-argmax-when-executing-compute/2053:

    You need to work with Numpy arrays in simple_convolve, because this method is applied on Dask Array chunks, which are Numpy arrays. It should at least return a Numpy Array.