I have written a code and try to use numba for accelerating the code. The main goal of the code is to group some values based on a condition. In this regard, iter_
is used for converging the code to satisfy the condition. I prepared a small case below to reproduce the sample code:
import numpy as np
import numba as nb
rng = np.random.default_rng(85)
# --------------------------------------- small data volume ---------------------------------------
# values_ = {'R0': np.array([0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723, 0.00990762,
# 0.01090976, 0.01069902, 0.00724112, 0.0068463 , 0.01135723]),
# 'R1': np.array([0.01836379, 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088,
# 0.01900166, 0.01864162, 0.0182823 , 0.01840322, 0.01653088]),
# 'R2': np.array([0.02430913, 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411,
# 0.02239156, 0.02225379, 0.02093393, 0.02408692, 0.02110411])}
#
# params = {'R0': [3, 0.9490579204466154, 1825, 7.070272000000002e-05],
# 'R1': [0, 0.9729203826820172, 167 , 7.070272000000002e-05],
# 'R2': [1, 0.6031363088057902, 1316, 8.007296000000003e-05]}
#
# Sno, dec_, upd_ = 2, 100, 200
# -------------------------------------------------------------------------------------------------
# ----------------------------- UPDATED (medium and large data volumes) ---------------------------
# values_ = np.load("values_med.npy", allow_pickle=True)[()]
# params = np.load("params_med.npy", allow_pickle=True)[()]
values_ = np.load("values_large.npy", allow_pickle=True)[()]
params = np.load("params_large.npy", allow_pickle=True)[()]
Sno, dec_, upd_ = 2000, 1000, 200
# -------------------------------------------------------------------------------------------------
# values_ = [*values_.values()]
# params = [*params.values()]
# @nb.jit(forceobj=True)
# def test(values_, params, Sno, dec_, upd_):
final_dict = {}
for i, j in enumerate(values_.keys()):
Rand_vals = []
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
while not np.allclose(goal_sum, final_sum, atol=tel):
iter_ += 1
vals_group = rng.choice(values_[j], size=params[j][0], replace=False)
# final_sum = 0.0016 * np.sum(vals_group) # -----> For small data volume
final_sum = np.sum(vals_group ** 3) # -----> UPDATED For med or large data volume
if iter_ == upd_:
t += 1
tel = t * tel
values_[j] = np.delete(values_[j], np.where(np.in1d(values_[j], vals_group)))
Rand_vals.append(vals_group)
else:
Rand_vals = [np.array([])] * Sno
final_dict["R" + str(i)] = Rand_vals
# return final_dict
# test(values_, params, Sno, dec_, upd_)
At first, for applying numba on this code @nb.jit
was used (forceobj=True
is used for avoiding warnings and …), which will have adverse effect on the performance. nopython
is checked, too, with @nb.njit
which get the following error due to not supporting (as mentioned in 1, 2) dictionary type of the inputs:
cannot determine Numba type of <class 'dict'>
I don't know if (how) it could be handled by Dict
from numba.typed
(by converting created python dictionaries to numba Dict) or if converting the dictionaries to lists of arrays have any advantage. I think, parallelization may be possible if some code lines e.g. Rand_vals.append(vals_group)
or else section or … be taken or be modified out of the function to get the same results as before, but I don't have any idea how to do so.
I will be grateful for helping utilize numba on this code. numba parallelization
will be the most desired (probably the best applicable method in terms of performance) solution if it could.
This code can be converted to Numba but it is not straightforward.
First of all, the dictionary and list type must be defined since Numba njit
functions cannot directly operate on reflected lists (aka. pure-python lists). This is a bit tedious to do in Numba and the resulting code is a bit verbose:
String = nb.types.unicode_type
ValueArray = nb.float64[::1]
ValueDict = nb.types.DictType(String, ValueArray)
ParamDictValue = nb.types.Tuple([nb.int_, nb.float64, nb.int_, nb.float64])
ParamDict = nb.types.DictType(String, ParamDictValue)
FinalDictValue = nb.types.ListType(ValueArray)
FinalDict = nb.types.DictType(String, FinalDictValue)
Then you need to convert the input dictionaries:
nbValues = nb.typed.typeddict.Dict.empty(String, ValueArray)
for key,value in values_.items():
nbValues[key] = value.copy()
nbParams = nb.typed.typeddict.Dict.empty(String, ParamDictValue)
for key,value in params.items():
nbParams[key] = (nb.int_(value[0]), nb.float64(value[1]), nb.int_(value[2]), nb.float64(value[3]))
Then, you need to write the core function. np.allclose
and np.isin
are not implemented in Numba so they should be reimplemented manually. But the main point is that Numba does not support the rng
Numpy object. I think it will certainly not support it any time soon. Note that Numba has an random numbers implementation that try to mimic the behavior of Numpy but the management of the seed is a bit different. Note also that results should be the same with the np.random.xxx
Numpy functions if the seed is set to the same value (Numpy and Numba have different seed variables that are not synchronized).
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
if params[j][0] != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
vals_group = np.empty(0, dtype=nb.float64)
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
vals_group = np.random.choice(values_[j], size=params[j][0], replace=False)
final_sum = 0.0016 * np.sum(vals_group)
# final_sum = 0.0016 * np.sum(vals_group) # (for small data volume)
final_sum = np.sum(vals_group ** 3) # (for med or large data volume)
if iter_ == upd_:
t += 1
tel = t * tel
# Perform an in-place deletion
vals, gr = values_[j], vals_group
cur = 0
for l in range(vals.size):
found = False
for m in range(gr.size):
found |= vals[l] == gr[m]
if not found:
# Keep the value (delete it otherwise)
vals[cur] = vals[l]
cur += 1
values_[j] = vals[:cur]
Rand_vals.append(vals_group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
Note that the replacement implementation of np.isin
is quite naive but it works pretty well in practice on your input example.
The function can be called using the following way:
nbFinalDict = nbTest(nbValues, nbParams, Sno, dec_, upd_)
Finally, the dictionary should be converted back to basic Python objects:
finalDict = dict()
for key,value in nbFinalDict.items():
finalDict[key] = list(value)
This implementation is fast for small inputs but not large ones since np.random.choice
takes almost all the time (>96%). The thing is this function is clearly not optimal when the number of requested item is small (which is your case). Indeed, it surprisingly runs in linear time of the input array and not in linear time of the number of requested items.
The algorithm can be completely rewritten to extract only 12 random items and discard them from the main currant array in a much more efficient way. The idea is to swap n
items (small target sample) at the end of the array with other items at random locations, then check the sum, repeat this process until a condition is fulfilled, and finally extract the view to the last n
items before resizing the view so to discard the last items. All of this can be done in O(n)
time rather than O(m)
time where m
is the size of the main current array with n << m
(eg. 12 VS 20_000). It can also be compute without any expensive allocation. Here is the resulting code:
@nb.njit(nb.void(ValueArray, nb.int_, nb.int_))
def swap(arr, i, j):
arr[i], arr[j] = arr[j], arr[i]
@nb.njit(FinalDict(ValueDict, ParamDict, nb.int_, nb.int_, nb.int_))
def nbTest(values_, params, Sno, dec_, upd_):
final_dict = nb.typed.Dict.empty(String, FinalDictValue)
for i, j in enumerate(values_.keys()):
Rand_vals = nb.typed.List.empty_list(ValueArray)
goal_sum = params[j][1] * params[j][3]
tel = goal_sum / dec_ * 10
values = values_[j]
n = params[j][0]
if n != 0:
for k in range(Sno):
final_sum = 0.0
iter_ = 0
t = 1
m = values.size
assert n <= m
group = values[-n:]
while np.abs(goal_sum - final_sum) > (1e-05 * np.abs(final_sum) + tel):
iter_ += 1
# Swap the group view with other random items
for pos in range(m - n, m):
swap(values, pos, np.random.randint(0, m))
# For small data volume:
# final_sum = 0.0016 * np.sum(group)
# For med/large data volume
final_sum = 0.0
for v in group:
final_sum += v ** 3
if iter_ == upd_:
t += 1
tel *= t
assert iter_ > 0
values = values[:m-n]
Rand_vals.append(group)
else:
for k in range(Sno):
Rand_vals.append(np.empty(0, dtype=nb.float64))
final_dict["R" + str(i)] = Rand_vals
return final_dict
In addition to being faster, this implementation as the benefit of being also simpler. Results looks quite similar to the previous implementation despite the randomness make the check of the results tricky (especially since this function does not use the same method to choose the random sample). Note that this implementation does not remove items in values
that are in group
as opposed to the previous one (this is probably not wanted though).
Here are the results of the last implementation on my machine (compilation and conversion timings excluded):
Provided small input (embedded in the question):
- Initial code: 42.71 ms
- Numba code: 0.11 ms
Medium input:
- Initial code: 3481 ms
- Numba code: 11 ms
Large input:
- Initial code: 6728 ms
- Numba code: 20 ms
Note that the conversion time takes about the same time than the computation.
This last implementation is 316~388 times faster than the initial code on small inputs.
Note that the compilation time takes few seconds due to the dict and lists types.
Note that while it may be possible to parallelise the implementation, only the most encompassing loop can be parallelised. The thing is there is only few items to compute and the time is already quite small (not the best case for multi-threading). <-- Additionally, the creation of many temporary arrays (created by rng.choice
) will certainly cause the parallel loop not to scale well anyway. --> Additionally, the list/dict cannot be written from multiple threads safely so one need to use Numpy arrays in the whole function to be able to do that (or add additional conversion that are already expensive). Moreover, Numba parallelism tends to increase significantly the compilation time which is already significant. Finally, the result will be less deterministic since each Numba thread has its own random number generator seed and the items computed by the threads cannot be predicted with prange
(dependent of the parallel runtime chosen on the target platform). Note that in Numpy there is one global seed by default used by usual random functions (deprecated way) and RNG objects have their own seed (new preferred way).