Search code examples
pythonc++numpycythongil

How to assign numpy.ndarray to temporary variable under nogil loop in cython?


I'm trying to implement implicit recommender model and have issues with code run time calculating top 5 suggestions to ~11kk users over ~100k items.

I was able to partly solve the problem by numpy with some cython sparkles (in jupyter notebook). Lines with numpy sorting are still using single core:

%%cython -f
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: linetrace=True
# cython: binding=True
# distutils: define_macros=CYTHON_TRACE_NOGIL=1
from cython.parallel import parallel, prange
import numpy as np
from tqdm import tqdm

def test(users_items=np.random.rand(11402139//1000, 134751//100)
        , int N=5, show_progress=True, int num_threads=1):
    # Define User count and loops indexes
    cdef int users_c = users_items.shape[0], u, i
    # Predefine zero 2-D C-ordered array for recommendations
    cdef int[:,::1] users_recs = np.zeros((users_c, N), dtype=np.intc)
    for u in tqdm(range(users_c), total=users_c, disable=not show_progress):
        # numpy .dot multiplication using multiple cores
        scores = np.random.rand(134751//1000, 10).dot(np.random.rand(10))
        # numpy partial sort
        ids_partial = np.argpartition(scores, -N)[-N:]
        ids_top = ids_partial[np.argsort(scores[ids_partial])]
        # Fill predefined 2-D array
        for i in range(N):
            users_recs[u, i] = ids_top[i]
    return np.asarray(users_recs)
# Working example
tmp = test()

I profiled it - np.argpartition consumes 60% of function time and uses onde core. I'm trying to make it parallel, cause I have a server with 80 cores. So, I perform .dot operation on a subset of users (uses multiple cores) and plan to fill empty predefined array by numpy sorting results (which use single core) in parallel, but I'm stuck with error from question title:

%%cython -f
# cython: language_level=3
# cython: boundscheck=False
# cython: wraparound=False
# cython: linetrace=True
# cython: binding=True
# distutils: define_macros=CYTHON_TRACE_NOGIL=1
from cython.parallel import parallel, prange
import numpy as np
from tqdm import tqdm
from math import ceil
def test(int N=10, show_progress=True, int num_threads=1):
    # Define User and Item count and loops indexes
    cdef int users_c = 11402139//1000, items_c = 134751//100, u, i, u_b
    # Predefine zero 2-D C-ordered array for recommendations
    cdef int[:,::1] users_recs = np.zeros((users_c, N), dtype=np.intc)
    # Define memoryview var
    cdef float[:,::1] users_items_scores_mv
    progress = tqdm(total=users_c, disable=not show_progress)
    # For a batch of Users
    for u_b in range(5):
        # Use .dot operation which use multiple cores
        users_items_scores = np.random.rand(num_threads, 10).dot(np.random.rand(134751//100, 10).T)
        # Create memory view to 2-D array, which I'm trying to sort row wise
        users_items_scores_mv = users_items_scores
        # Here it starts, try to use numpy sorting in parallel
        for u in prange(num_threads, nogil=True, num_threads=num_threads):
            ids_partial = np.argpartition(users_items_scores_mv[u], items_c-N)[items_c-N:]
            ids_top = ids_partial[np.argsort(users_items_scores_mv[u][ids_partial])]
            # Fill predefined 2-D array
            for i in range(N):
                users_recs[u_b + u, i] = ids_top[i]
        progress.update(num_threads)
    progress.close()
    return np.asarray(users_recs)

and got this (full error):

Error compiling Cython file:
------------------------------------------------------------
...
        # Create memory view to 2-D array,
        # which I'm trying to sort row wise
        users_items_scores_mv = users_items_scores
        # Here it starts, try to use numpy sorting in parallel
        for u in prange(num_threads, nogil=True, num_threads=num_threads):
            ids_partial = np.argpartition(users_items_scores_mv[u], items_c-N)[items_c-N:]
           ^
------------------------------------------------------------

/datascc/enn/.cache/ipython/cython/_cython_magic_201b296cd5a34240b4c0c6ed3e58de7c.pyx:31:12: Assignment of Python object not allowed without gil

I read about memory views and malloc-ating but haven't found example applicable to my situation.


Solution

  • I ended up with custom C++ function, that fills numpy array in parallel with nogil via openmp. It required rewriting numpy's argpartition partial sorting with cython. Algorythm is like this (3-4 can be looped):

    1. define empty array A[i,j] and memory view B_mv[i,k]; where "i" is batch size, "j" some columns and "k" number of desired items to be returned after sorting
    2. create pointers on A&B's memory
    3. run some calculations and fill A with data
    4. iterate in parallel over i-s and fill B
    5. transform result into readable form

    Solution consists of:

    topnc.h - header of custom function implementation:

    /* "Copyright [2019] <Tych0n>"  [legal/copyright] */
    #ifndef IMPLICIT_TOPNC_H_
    #define IMPLICIT_TOPNC_H_
    
    extern void fargsort_c(float A[], int n_row, int m_row, int m_cols, int ktop, int B[]);
    
    #endif  // IMPLICIT_TOPNC_H_
    

    topnc.cpp - body of the function:

    #include <vector>
    #include <limits>
    #include <algorithm>
    #include <iostream>
    
    #include "topnc.h"
    
    struct target {int index; float value;};
    bool targets_compare(target t_i, target t_j) { return (t_i.value > t_j.value); }
    
    void fargsort_c ( float A[], int n_row, int m_row, int m_cols, int ktop, int B[] ) {
        std::vector<target> targets;
        for ( int j = 0; j < m_cols; j++ ) {
            target c;
            c.index = j;
            c.value = A[(n_row*m_cols) + j];
            targets.push_back(c);
        }
        std::partial_sort( targets.begin(), targets.begin() + ktop, targets.end(), targets_compare );
        std::sort( targets.begin(), targets.begin() + ktop, targets_compare );
        for ( int j = 0; j < ktop; j++ ) {
            B[(m_row*ktop) + j] = targets[j].index;
        }
    }
    

    ctools.pyx - example usage

    # distutils: language = c++
    # cython: language_level=3
    # cython: boundscheck=False
    # cython: wraparound=False
    # cython: nonecheck=False
    from cython.parallel import parallel, prange
    import numpy as np
    cimport numpy as np
    
    cdef extern from "topnc.h":
        cdef void fargsort_c ( float A[], int n_row, int m_row, int m_cols, int ktop, int B[] ) nogil
    
    A = np.zeros((1000, 100), dtype=np.float32)
    A[:] = np.random.rand(1000, 100).astype(np.float32)
    cdef:
        float[:,::1] A_mv = A
        float* A_mv_p = &A_mv[0,0]
        int[:,::1] B_mv = np.zeros((1000, 5), dtype=np.intc)
        int* B_mv_p = &B_mv[0,0]
        int i
    for i in prange(1000, nogil=True, num_threads=10, schedule='dynamic'):
        fargsort_c(A_mv_p, i, i, 100, 5, B_mv_p)
    B = np.asarray(B_mv)
    

    compile.py - compile file; run it by command "python compile.py build_ext --inplace -f" in terminal (this will result in file ctools.cpython-*.so, which you then use for import):

    from os import path
    import numpy
    from setuptools import setup, Extension
    from Cython.Distutils import build_ext
    from Cython.Build import cythonize
    
    ext_utils = Extension(
        'ctools'
        , sources=['ctools.pyx', 'topnc.cpp']
        , include_dirs=[numpy.get_include()]
        , extra_compile_args=['-std=c++0x', '-Os', '-fopenmp']
        , extra_link_args=['-fopenmp']
        , language='c++'
    )
    
    setup(
        name='ctools',
        setup_requires=[
            'setuptools>=18.0'
            , 'cython'
            , 'numpy'
        ]
        , cmdclass={'build_ext': build_ext}
        , ext_modules=cythonize([ext_utils]),
    )
    

    It was used for adding "recommend all" functionality into implicit ALS model.