Search code examples
pythonnumpysortingparallel-processingcython

Parallelizing numpy.sort


I need to sort uint64 arrays of length 1e8-1e9, which is one of the performance bottlenecks in my current project. I have just recently updated numpy v2.0 version, in which the sorting algorithm is significantly optimized. Testing it on my hardware, its about 5x faster than numpy v1.26 version. But currently numpy's sorting algorithm cannot utilize multi-core CPUs even though it uses SIMD.

I tried to parallelize it and sort multiple np.array at the same time. One possible approach is to use numba prange, but numba has always had poor support for numpy sorting. numba.jit even has a slowdown effect on np.sort, and numba v0.60.0 fails to follow up on numpy v2.0's optimizations for sorting (https://github.com/numba/numba/issues/9611). The alternative is cython prange, but cython does not allow the creation of Python objects at nogil. Is there a way to sort numpy.array in parallel using cython or otherwise? If using cpp's parallel sorting libraries, are they faster than numpy's own sorting, taking into account the overhead of data type conversions?

arr=np.random.randint(0,2**64,int(3e8),dtype='uint64')  
sorted_arr=np.sort(arr)  # single thread np.sort takes 4 seconds (numpy v2.0.0)

Solution

  • This answer show why a pure-Python, Numba or Cython implementation certainly cannot be used to write a (reasonably-simple) efficient implementation (this summaries the comments). It provides a C++ version which can be called from CPython. The provided version is fast independently of the Numpy version used (so Numpy 2.0 is not required).


    Why it is certainly not possible directly with Numba/Cython/pure-Python

    I do no think it is possible to call sort of Numpy in parallel with Cython/Numba because of the GIL and many additional issues.

    Regarding Numba, parallel loops need the GIL to be release and no object can be manipulated inside it. The Numba sorting function does not actually call Numpy functions, but its own implementation which does not use the GIL nor create any Python object (which require the GIL to be enabled). The Numba sequential implementation is inefficient anyway. While one can try to re-implement a parallel sort from scratch, the parallel features are too limited for the resulting implementation to be really fast or reasonable simple (or both). Indeed, it is limited to a simple parallel for loop called prange (no atomics, critical sections, barriers, TLS storage, etc.).

    Regarding Cython, prange of Cython requires the GIL to be disabled so creating Python object is not possible in the parallel loop preventing np.sort to be called... Cython provides more parallel features than Numba so re-implementing a parallel sort from scratch seems possible at first glance. However, in practice, it is really complicated (if even possible) to write a fast implementation because of many issues and opened/unknown bugs. Here are the issues I found out while trying to write such a code:

    • OpenMP barriers are not yet available and there is no sane (portable) replacement;
    • critical sections are also not yet available so one need to use manual locks (instead of just #pragma omp critical;
    • arrays must be allocated and freed manually using malloc and free in parallel sections (bug prone and resulting in a more complex code);
    • It is not possible to create views in parallel sections (only outside);
    • Cython does not seems to support well Numpy 2.0 yet causing many compilation errors and also runtime ones (see this post which seems related to this);
    • the documentation of OpenMP functions is rather limited (parts are simply missing);
    • variables of a prange-based loop cannot be reused in a range-based loop outside the prange-loop

    I also tried to use a ThreadPoolExecutor so to call some optimized Cython/Numba functions and circumvent the aforementioned limitations of the two but it resulted in a very slow implementation (slower than just calling np.sort) mainly because of the GIL (nearly no speed up) and Numpy overhead (mainly temporary arrays and more precisely page-faults).


    Efficient parallel C++ solution

    We can write an efficient parallel C++ code performing the following steps:

    • split the input array in N slices
    • perform a bucket sort on each part in parallel so we get M buckets for each slice
    • merge the resulting buckets so to get M buckets from the M x N buckets
    • sort the M buckets in parallel using a SIMD-optimized sort -- this can be done with the x86simdsort C++ library (used internally by Numpy) though it only works on x86-64 CPUs
    • merge the M buckets so to get the final array

    We need to write a BucketList data structure so to add numbers in a variable-size container. This is basically a linked list of chunks. Note a growing std::vector is not efficient because each resize put too much pressure on memory (and std::deque operations are so slow that is is even slower).

    Here is the resulting C++ code:

    // File: wrapper.cpp
    // Assume x86-simd-sort has been cloned in the same directory and built
    #include "x86-simd-sort/lib/x86simdsort.h"
    #include <cstdlib>
    #include <cstring>
    #include <forward_list>
    #include <mutex>
    #include <omp.h>
    
    
    template <typename T, size_t bucketMaxSize>
    struct BucketList
    {
        using Bucket = std::array<T, bucketMaxSize>;
    
        std::forward_list<Bucket> buckets;
        uint32_t bucketCount;
        uint32_t lastBucketSize;
    
        BucketList() : bucketCount(1), lastBucketSize(0)
        {
            buckets.emplace_front();
        }
    
        void push_back(const T& value)
        {
            if (lastBucketSize == bucketMaxSize)
            {
                buckets.emplace_front();
                lastBucketSize = 0;
                bucketCount++;
            }
    
            Bucket* lastBucket = &*buckets.begin();
            (*lastBucket)[lastBucketSize] = value;
            lastBucketSize++;
        }
    
        size_t size() const
        {
            return (size_t(bucketCount) - 1lu) * bucketMaxSize + lastBucketSize;
        }
    
        size_t bucketSize(size_t idx) const
        {
            return idx == 0 ? lastBucketSize : bucketMaxSize;
        }
    };
    
    
    extern "C" void parallel_sort(int64_t* arr, size_t size)
    {
        static const size_t bucketSize = 2048;
        static const size_t radixBits = 11;
        static const size_t bucketCount = 1 << radixBits;
    
        struct alignas(64) Slice
        {
            int64_t* data = nullptr;
            size_t size = 0;
            size_t global_offset = 0;
            size_t local_offset = 0;
            std::mutex mutex;
        };
    
        std::array<Slice, bucketCount> slices;
    
        #pragma omp parallel
        {
            std::array<BucketList<int64_t, bucketSize>, bucketCount> tlsBuckets;
    
            #pragma omp for nowait
            for (size_t i = 0; i < size; ++i)
            {
                constexpr uint64_t signBit = uint64_t(1) << uint64_t(63);
                const uint64_t idx = (uint64_t(arr[i]) ^ signBit) >> (64 - radixBits);
                tlsBuckets[idx].push_back(arr[i]);
            }
    
            #pragma omp critical
            for (size_t i = 0; i < bucketCount; ++i)
                slices[i].size += tlsBuckets[i].size();
    
            #pragma omp barrier
    
            #pragma omp single
            {
                size_t offset = 0;
    
                for (size_t i = 0; i < bucketCount; ++i)
                {
                    Slice& slice = slices[i];
                    slice.data = &arr[offset];
                    slice.global_offset = offset;
                    offset += slice.size;
                }
            }
    
            for (size_t i = 0; i < bucketCount; ++i)
            {
                Slice& slice = slices[i];
                size_t local_offset;
                size_t local_offset_end;
    
                {
                    std::scoped_lock lock(slice.mutex);
                    local_offset = slice.local_offset;
                    slice.local_offset += tlsBuckets[i].size();
                    local_offset_end = slice.local_offset;
                }
    
                uint32_t bucketListId = 0;
    
                for(const auto& kv : tlsBuckets[i].buckets)
                {
                    const size_t actualBucketSize = tlsBuckets[i].bucketSize(bucketListId);
                    memcpy(&slice.data[local_offset], &kv[0], sizeof(int64_t) * actualBucketSize);
                    local_offset += actualBucketSize;
                    bucketListId++;
                }
            }
    
            #pragma omp barrier
    
            #pragma omp for schedule(dynamic)
            for (size_t i = 0; i < bucketCount; ++i)
                x86simdsort::qsort(&slices[i].data[0], slices[i].size);
        }
    }
    

    A simple header can be written if you want to call this implementation from Cython (though it can be complicated due to the aforementioned Cython/Numpy-2.0 compatibility issue). Here is an example:

    // File: wrapper.h
    #include <stdlib.h>
    #include <stdint.h>
    void parallel_sort(int64_t* arr, size_t size)
    

    You can compile the code with Clang using the following command lines on Linux:

    clang++ -O3 -fopenmp -c wrapper.cpp -fPIC -g
    clang wrapper.o -o wrapper.so -fopenmp --shared -Lx86-simd-sort/build -lx86simdsortcpp
    

    The following one may also be needed to find the x86-simd-sort library at runtime once cloned and built:

    export LD_LIBRARY_PATH=x86-simd-sort/build:$LD_LIBRARY_PATH
    

    You can finally use the fast sorting function from a Python code. I personally use ctypes because it worked directly with no issues (except when the code is compiled with GCC for unknown strange reasons). Here is an example:

    import numpy as np
    
    import ctypes
    lib = ctypes.CDLL('./wrapper.so')
    parallel_sort = lib.parallel_sort
    parallel_sort.argtypes = [ctypes.c_voidp, ctypes.c_size_t]
    parallel_sort.restype = None
    
    fullCheck = False
    
    print('Generating...')
    a = np.random.randint(0, (1<<63) - 1, 1024*1024**2)
    if fullCheck: b = a.copy()
    
    print('Benchmark...')
    #%time a.sort()
    %time parallel_sort(a.ctypes.data, a.size)
    
    print('Full check...' if fullCheck else 'Check...')
    if fullCheck:
        b.sort()
        assert np.array_equal(b, a)
    else:
        assert np.all(np.diff(a) >= 0)
    

    Notes and performance results

    Note this require a lot of memory to do the test, especially if fullCheck is set to true.

    Note that the C++ code is optimized for sorting huge arrays (with >1e8 items). The memory consumption will be significant for smaller arrays compared to their size. The current code will even be slow for small arrays (<1e5). You can tune constants/parameters regarding your needs. For tiny arrays, you can directly call the x86-simd-sort library. Once tuned properly, it should be faster than np.sort for all arrays (whatever their size). I strongly advise you to tune the parameters regarding your specific input and target CPU, especially radixBits. The current code/parameters are optimized for mainstream Intel CPUs (not recent big-little Intel ones nor AMD ones) and positive numbers. If you know there are only positive numbers in the input, you can skip the most-significantly bit (sign bit).

    Here is the resulting timings on my 6-core i5-9600KF CPU (with Numpy 2.0):

    np.sort:             19.3 s
    Proposed C++ code:    4.3 s
    

    The C++ parallel implementation is 4.5 times faster than the sequential optimized Numpy one.

    Note I did not massively test the code but basic checks like the one proposed in the provided Python script reported no error so far (even on negative numbers apparently).

    Note that this sort is efficient if the highest bits of the sorted numbers are different set (this is the downside of bucket/radix sorts). Ideally, numbers should be uniformly distributed and use all the highest bits. If this is not the case, then the buckets will be unbalanced resulting in a lower scalability. In the worst case, only 1 bucket is used resulting in a serial implementation. You can track the highest bit set so to mitigate this issue. More complex approaches are required when there are some rare big outliers (eg. remapping preserving the ordering).