I am struggling to correctly parallelise a function using Cython. Basically, the problem is to bin some data. The actual code is a bit long, but in the end it does something like this:
def bin_var(double[:] dist,
double[:] values,
double[:] bin_def,
double[:] varg, long[:] count):
dbin = (bin_def[1] - bin_def[0]) / bin_def[2]
for n1 in range(values.size):
if (dist[n1] < bin_def[0]) or (dist[n1] >= bin_def[1]):
continue
else:
ni = int((dist - bin_def[0]) / dbin)
count[ni] += 1
varg[ni] += calc_something(values[ni])
# Compute the mean
for n1 in range(int(bin_def[2])):
varg[ni] /= count[ni]
This code lends itself to some simple parallelisation (values
and dist
are very large): one needs to split the first for
loop over separate processes, each working on its own version of the count
and varg
arrays. When that is done, one has to combine everything together by summing the different versions of count
and varg
before the second for loop (much shorter).
That said, it's two days I'm trying to understand how to implement efficiently this in Cython, and I am starting to suspect it is not possible with the current version of the language. Note that just using prange
from cython.parallel
for the first loop does not provide correct results, because (I assume) of the simultaneous access of ni
, count
and varg
from different threads.
Is Cython parallel support really so limited? I was getting such a nice speedup single-threaded, I just hoped I could continue...
I can think of three options here:
Use the GIL to ensure that the +=
is done single threaded:
varg_ni = calc_something(values[ni]) # keep this out
# of the single threaded block...
with gil:
count[ni] += 1
varg[ni] += varg_ni
This is easy and won't be too bad provided the work done in calc_something
is reasonably large
Make count
and varg
2D arrays with each thread writing to a different column. Sum along the second dimension afterwards:
# rough, untested outline....
# might need to go in a `with parallel()` block
num_threads = openmp.omp_get_num_threads()
cdef double[:,:] count_tmp = np.zeros((count.shape[0],num_threads))
cdef double[:,:] varg_tmp = np.zeros((varg.shape[0],num_threads))
# then in the loop:
count_tmp[ni,cython.parallel.threadid()] += 1
varg_tmp[ni,cython.parallel.threadid()] += calc_something(values[ni])
# after the loop:
count[:] = np.sum(count_tmp,axis=1)
varg[:] = np.sum(varg_tmp,axis=1)
You could also do something similar using the ideas in the local_buf
example in the documentation.
(NOTE - GCC is currently giving me an "internal compiler error" for this - I feel it should work, but for the moment it doesn't seem to be working, so try option 3 at your own risk...) Use the openmp atomic
directive to do the addition atomically. This requires a bit of work to circumvent Cython but shouldn't be too difficult. Create a short C header file with an add_inplace
macro:
#define add_inplace(x,y) _Pragma("omp atomic") x+=y
The _Pragma
is a C99 feature that should allow you to put pragmas in preprocessor statements. Then tell Cython about that header file (as if it's a function):
cdef extern from "header.h":
void add_inplace(...) nogil # just use varargs to make Cython think it accepts anything
Then in the loop do:
add_inplace(count[ni], 1)
add_inplace(varg[ni], calc_something(values[ni]))
Because this uses macro trickery it may be a bit fragile (i.e. definitely won't work with PyObject*
s, but it should generate the correct C code when using standard C numeric types. (Inspect the code to be sure)