Search code examples
performanceoptimizationparallel-processingnumbajit

Problem by dictionaries to use numba njit parallelization to accelerate the code


I have written a code and try to use numba for accelerating the code. The main goal of the code is to group some values based on a condition. In this regard, iter_ is used for converging the code to satisfy the condition. I prepared a small case below to reproduce the sample code:

import numpy as np
import numba as nb

rng = np.random.default_rng(85)

# --------------------------------------- small data volume ---------------------------------------
# values_ = {'R0': np.array([0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723, 0.00990762,
#                                        0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723]),
#            'R1': np.array([0.01836379, 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088,
#                                        0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088]),
#            'R2': np.array([0.02430913, 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411,
#                                        0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411])}
#
# params = {'R0': [3, 0.9490579204466154, 1825, 7.070272000000002e-05],
#           'R1': [0, 0.9729203826820172, 167 , 7.070272000000002e-05],
#           'R2': [1, 0.6031363088057902, 1316, 8.007296000000003e-05]}
#
# Sno, dec_, upd_ = 2, 100, 200
# -------------------------------------------------------------------------------------------------

# ----------------------------- UPDATED (medium and large data volumes) ---------------------------
# values_ = np.load("values_med.npy", allow_pickle=True)[()]
# params = np.load("params_med.npy", allow_pickle=True)[()]
values_ = np.load("values_large.npy", allow_pickle=True)[()]
params = np.load("params_large.npy", allow_pickle=True)[()]

Sno, dec_, upd_ = 2000, 1000, 200
# -------------------------------------------------------------------------------------------------

# values_ = [*values_.values()]
# params = [*params.values()]


# @nb.jit(forceobj=True)
# def test(values_, params, Sno, dec_, upd_):

final_dict = {}
for i, j in enumerate(values_.keys()):
    Rand_vals = []
    goal_sum = params[j][1] * params[j][3]
    tel = goal_sum / dec_ * 10
    if params[j][0] != 0:
        for k in range(Sno):
            final_sum = 0.0
            iter_ = 0
            t = 1
            while not np.allclose(goal_sum, final_sum, atol=tel):
                iter_ += 1
                vals_group = rng.choice(values_[j], size=params[j][0], replace=False)
                # final_sum = 0.0016 * np.sum(vals_group)  # -----> For small data volume
                final_sum = np.sum(vals_group ** 3)        # -----> UPDATED For med or large data volume
                if iter_ == upd_:
                    t += 1
                    tel = t * tel
            values_[j] = np.delete(values_[j], np.where(np.in1d(values_[j], vals_group)))
            Rand_vals.append(vals_group)
    else:
        Rand_vals = [np.array([])] * Sno
    final_dict["R" + str(i)] = Rand_vals

#    return final_dict


# test(values_, params, Sno, dec_, upd_)

At first, for applying numba on this code @nb.jit was used (forceobj=True is used for avoiding warnings and …), which will have adverse effect on the performance. nopython is checked, too, with @nb.njit which get the following error due to not supporting (as mentioned in 1, 2) dictionary type of the inputs:

cannot determine Numba type of <class 'dict'>

I don't know if (how) it could be handled by Dict from numba.typed (by converting created python dictionaries to numba Dict) or if converting the dictionaries to lists of arrays have any advantage. I think, parallelization may be possible if some code lines e.g. Rand_vals.append(vals_group) or else section or … be taken or be modified out of the function to get the same results as before, but I don't have any idea how to do so.

I will be grateful for helping utilize numba on this code. numba parallelization will be the most desired (probably the best applicable method in terms of performance) solution if it could.


Data:


Solution

  • This code can be converted to Numba but it is not straightforward.

    First of all, the dictionary and list type must be defined since Numba njit functions cannot directly operate on reflected lists (aka. pure-python lists). This is a bit tedious to do in Numba and the resulting code is a bit verbose:

    String = nb.types.unicode_type
    ValueArray = nb.float64[::1]
    ValueDict = nb.types.DictType(String, ValueArray)
    ParamDictValue = nb.types.Tuple([nb.int_, nb.float64, nb.int_, nb.float64])
    ParamDict = nb.types.DictType(String, ParamDictValue)
    FinalDictValue = nb.types.ListType(ValueArray)
    FinalDict = nb.types.DictType(String, FinalDictValue)
    

    Then you need to convert the input dictionaries:

    nbValues = nb.typed.typeddict.Dict.empty(String, ValueArray)
    for key,value in values_.items():
        nbValues[key] = value.copy()
    
    nbParams = nb.typed.typeddict.Dict.empty(String, ParamDictValue)
    for key,value in params.items():
        nbParams[key] = (nb.int_(value[0]), nb.float64(value[1]), nb.int_(value[2]), nb.float64(value[3]))
    

    Then, you need to write the core function. np.allclose and np.isin are not implemented in Numba so they should be reimplemented manually. But the main point is that Numba does not support the rng Numpy object. I think it will certainly not support it any time soon. Note that Numba has an random numbers implementation that try to mimic the behavior of Numpy but the management of the seed is a bit different. Note also that results should be the same with the np.random.xxx Numpy functions if the seed is set to the same value (Numpy and Numba have different seed variables that are not synchronized).

    @nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
    def nbTest(values_, params, Sno, dec_, upd_):
        final_dict = nb.typed.Dict.empty(String, FinalDictValue)
        for i, j in enumerate(values_.keys()):
            Rand_vals = nb.typed.List.empty_list(ValueArray)
            goal_sum = params[j][1] * params[j][3]
            tel = goal_sum / dec_ * 10
            if params[j][0] != 0:
                for k in range(Sno):
                    final_sum = 0.0
                    iter_ = 0
                    t = 1
    
                    vals_group = np.empty(0, dtype=nb.float64)
    
                    while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
                        iter_ += 1
                        vals_group = np.random.choice(values_[j], size=params[j][0], replace=False)
                        final_sum = 0.0016 * np.sum(vals_group)
                        # final_sum = 0.0016 * np.sum(vals_group)  # (for small data volume)
                        final_sum = np.sum(vals_group ** 3)        # (for med or large data volume)
                        if iter_ == upd_:
                            t += 1
                            tel = t * tel
    
                    # Perform an in-place deletion
                    vals, gr = values_[j], vals_group
                    cur = 0
                    for l in range(vals.size):
                        found = False
                        for m in range(gr.size):
                            found |= vals[l] == gr[m]
                        if not found:
                            # Keep the value (delete it otherwise)
                            vals[cur] = vals[l]
                            cur += 1
                    values_[j] = vals[:cur]
    
                    Rand_vals.append(vals_group)
            else:
                for k in range(Sno):
                    Rand_vals.append(np.empty(0, dtype=nb.float64))
            final_dict["R" + str(i)] = Rand_vals
        return final_dict
    

    Note that the replacement implementation of np.isin is quite naive but it works pretty well in practice on your input example.

    The function can be called using the following way:

    nbFinalDict = nbTest(nbValues, nbParams, Sno, dec_, upd_)
    

    Finally, the dictionary should be converted back to basic Python objects:

    finalDict = dict()
    for key,value in nbFinalDict.items():
        finalDict[key] = list(value)
    

    This implementation is fast for small inputs but not large ones since np.random.choice takes almost all the time (>96%). The thing is this function is clearly not optimal when the number of requested item is small (which is your case). Indeed, it surprisingly runs in linear time of the input array and not in linear time of the number of requested items.


    Further Optimizations

    The algorithm can be completely rewritten to extract only 12 random items and discard them from the main currant array in a much more efficient way. The idea is to swap n items (small target sample) at the end of the array with other items at random locations, then check the sum, repeat this process until a condition is fulfilled, and finally extract the view to the last n items before resizing the view so to discard the last items. All of this can be done in O(n) time rather than O(m) time where m is the size of the main current array with n << m (eg. 12 VS 20_000). It can also be compute without any expensive allocation. Here is the resulting code:

    @nb.njit(nb.void(ValueArray, nb.int_, nb.int_))
    def swap(arr, i, j):
        arr[i], arr[j] = arr[j], arr[i]
    
    @nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
    def nbTest(values_, params, Sno, dec_, upd_):
        final_dict = nb.typed.Dict.empty(String, FinalDictValue)
        for i, j in enumerate(values_.keys()):
            Rand_vals = nb.typed.List.empty_list(ValueArray)
            goal_sum = params[j][1] * params[j][3]
            tel = goal_sum / dec_ * 10
            values = values_[j]
            n = params[j][0]
    
            if n != 0:
                for k in range(Sno):
                    final_sum = 0.0
                    iter_ = 0
                    t = 1
    
                    m = values.size
                    assert n <= m
                    group = values[-n:]
    
                    while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
                        iter_ += 1
    
                        # Swap the group view with other random items
                        for pos in range(m - n, m):
                            swap(values, pos, np.random.randint(0, m))
    
                        # For small data volume:
                        # final_sum = 0.0016 * np.sum(group)
    
                        # For med/large data volume
                        final_sum = 0.0
                        for v in group:
                            final_sum += v ** 3
    
                        if iter_ == upd_:
                            t += 1
                            tel *= t
    
                    assert iter_ > 0
                    values = values[:m-n]
                    Rand_vals.append(group)
            else:
                for k in range(Sno):
                    Rand_vals.append(np.empty(0, dtype=nb.float64))
            final_dict["R" + str(i)] = Rand_vals
        return final_dict
    

    In addition to being faster, this implementation as the benefit of being also simpler. Results looks quite similar to the previous implementation despite the randomness make the check of the results tricky (especially since this function does not use the same method to choose the random sample). Note that this implementation does not remove items in values that are in group as opposed to the previous one (this is probably not wanted though).


    Benchmark

    Here are the results of the last implementation on my machine (compilation and conversion timings excluded):

    Provided small input (embedded in the question):
     - Initial code:   42.71 ms
     - Numba code:      0.11 ms
    
    Medium input:
     - Initial code:   3481 ms
     - Numba code:       11 ms
    
    Large input:
     - Initial code:   6728 ms
     - Numba code:       20 ms
    

    Note that the conversion time takes about the same time than the computation.

    This last implementation is 316~388 times faster than the initial code on small inputs.


    Notes

    Note that the compilation time takes few seconds due to the dict and lists types.

    Note that while it may be possible to parallelise the implementation, only the most encompassing loop can be parallelised. The thing is there is only few items to compute and the time is already quite small (not the best case for multi-threading). <-- Additionally, the creation of many temporary arrays (created by rng.choice) will certainly cause the parallel loop not to scale well anyway. --> Additionally, the list/dict cannot be written from multiple threads safely so one need to use Numpy arrays in the whole function to be able to do that (or add additional conversion that are already expensive). Moreover, Numba parallelism tends to increase significantly the compilation time which is already significant. Finally, the result will be less deterministic since each Numba thread has its own random number generator seed and the items computed by the threads cannot be predicted with prange (dependent of the parallel runtime chosen on the target platform). Note that in Numpy there is one global seed by default used by usual random functions (deprecated way) and RNG objects have their own seed (new preferred way).