Search code examples
multithreadingmultiprocessingcythonpython-multiprocessing

Parallelising a 'for' loop in Cython: beyond prange


I am struggling to correctly parallelise a function using Cython. Basically, the problem is to bin some data. The actual code is a bit long, but in the end it does something like this:

def bin_var(double[:] dist,
            double[:] values,
            double[:] bin_def,
            double[:] varg, long[:] count):

    dbin = (bin_def[1] - bin_def[0]) / bin_def[2]

    for n1 in range(values.size):
            if (dist[n1] < bin_def[0]) or (dist[n1] >= bin_def[1]):
                continue
            else:
                ni = int((dist - bin_def[0]) / dbin)
                count[ni] += 1
                varg[ni] += calc_something(values[ni])

    # Compute the mean
    for n1 in range(int(bin_def[2])):
        varg[ni] /= count[ni]

This code lends itself to some simple parallelisation (values and dist are very large): one needs to split the first for loop over separate processes, each working on its own version of the count and varg arrays. When that is done, one has to combine everything together by summing the different versions of count and varg before the second for loop (much shorter).

That said, it's two days I'm trying to understand how to implement efficiently this in Cython, and I am starting to suspect it is not possible with the current version of the language. Note that just using prange from cython.parallel for the first loop does not provide correct results, because (I assume) of the simultaneous access of ni, count and varg from different threads.

Is Cython parallel support really so limited? I was getting such a nice speedup single-threaded, I just hoped I could continue...


Solution

  • I can think of three options here:

    1. Use the GIL to ensure that the += is done single threaded:

      varg_ni = calc_something(values[ni]) # keep this out 
                     # of the single threaded block...
      with gil:
          count[ni] += 1
          varg[ni] += varg_ni
      

      This is easy and won't be too bad provided the work done in calc_something is reasonably large

    2. Make count and varg 2D arrays with each thread writing to a different column. Sum along the second dimension afterwards:

      # rough, untested outline....
      
      # might need to go in a `with parallel()` block
      num_threads = openmp.omp_get_num_threads()
      
      cdef double[:,:] count_tmp = np.zeros((count.shape[0],num_threads))
      cdef double[:,:] varg_tmp = np.zeros((varg.shape[0],num_threads))
      
      # then in the loop:
      count_tmp[ni,cython.parallel.threadid()] += 1
      varg_tmp[ni,cython.parallel.threadid()] += calc_something(values[ni])
      
      # after the loop:
      count[:] = np.sum(count_tmp,axis=1)
      varg[:] = np.sum(varg_tmp,axis=1)
      

      You could also do something similar using the ideas in the local_buf example in the documentation.

    3. (NOTE - GCC is currently giving me an "internal compiler error" for this - I feel it should work, but for the moment it doesn't seem to be working, so try option 3 at your own risk...) Use the openmp atomic directive to do the addition atomically. This requires a bit of work to circumvent Cython but shouldn't be too difficult. Create a short C header file with an add_inplace macro:

      #define add_inplace(x,y) _Pragma("omp atomic") x+=y
      

      The _Pragma is a C99 feature that should allow you to put pragmas in preprocessor statements. Then tell Cython about that header file (as if it's a function):

      cdef extern from "header.h":
          void add_inplace(...) nogil # just use varargs to make Cython think it accepts anything
      

      Then in the loop do:

      add_inplace(count[ni], 1)
      add_inplace(varg[ni], calc_something(values[ni]))
      

      Because this uses macro trickery it may be a bit fragile (i.e. definitely won't work with PyObject*s, but it should generate the correct C code when using standard C numeric types. (Inspect the code to be sure)