Search code examples
c++ssesimdintrinsicsavx

optimising column-wise maximum with SIMD


I have this function where I spent significant amount of time in my code, and I would like to optimise it by vectorization-SIMD-compiler intrinsics, if possible.

It essentially finds the value and the location of the maximum over a matrix over columns, and stores them:

  • val_ptr: input matrix: column-major (Fortran-style) n_rows-by-n_cols (where typically n_rows>>n_cols)
  • opt_pos_ptr : int vector of length n_rows where to store the position of the maximum. On entry filled with zeros.
  • max_ptr: float vector of length n_rows where to store the maximum. On entry filled with copies of the first column of val_ptr
  • The function will be called in a parallel loop
  • The memory region are guaranteed to be not overlapping
  • I don't really need the max_ptr to be filled, currently it is just used for book-keeping and to avoid memory allocation
  • I use MSVC, C++17 on Windows 10. Meant to run modern Intel CPUs

The code, where the template type are meant to be float or double:

template <typename eT>
find_max(const int n_cols, 
         const int n_rows, 
         const eT* val_ptr,
         int* opt_pos_ptr,
         eT* max_ptr){
    for (int col = 1; col < n_cols; ++col)
    {
        //Getting the pointer to the beginning of the column
        const auto* value_col = val_ptr + col * n_rows;
        //Looping over the rows
        for (int row = 0; row < n_rows; ++row)
        {
            //If the value is larger than the current maximum, we replace and we store its positions
            if (value_col[row] > max_ptr[row])
            {
                max_ptr[row] = value_col[row];
                opt_pos_ptr[row] = col;
            }
        }
    }
}

What I tried so far:

  • I tried to use OpenMP parallel for on the inner loop, but brings something only on very large rows, bit larger than my current usage.
  • The if in inner loop prevents #pragma omp simd to work, and I was not able to rewrite it without it.

Solution

  • Based on the code sample you posted, it looks like you want to compute a vertical maximum value, meaning that in your case "columns" are horizontal. In C/C++ horizontal sequences of elements (i.e. where two adjacent elements have distance of one element in memory) are normally called rows and vertical (where two adjacent elements have distance of row size in memory) - columns. In my answer below I will be using the traditional terminology, where rows are horizontal and columns are vertical.

    Also, for brevity I will be focusing on one possible type of the matrix element - float. The basic idea is the same for double, with the main difference being the number of elements per vector and the _ps/_pd intrinsics selection. I'll provide a version for double at the end.


    The idea is that you can compute vertical maximum for multiple columns in parallel using _mm_max_ps/_mm_max_pd. In order to also record the position of the found maximum, you can compare the previous maximum with the current elements. The result of the comparison is a mask, where the elements are all-ones where the maximum is updated. That mask can be used to select which position needs to be updated as well.

    I must note that the algorithm below assumes that it is not important which max element's position is recorded, if there are multiple equal max elements in a column. Also, I assume the matrix does not contain NaN values, which would affect the comparisons. More on this later.

    void find_max(const int n_cols, 
             const int n_rows, 
             const float* val_ptr,
             int* opt_pos_ptr,
             float* max_ptr){
        const __m128i mm_one = _mm_set1_epi32(1);
    
        // Pre-compute the number of rows that can be processed in full vector width.
        // In a 128-bit vector there are 4 floats or 2 doubles
        int tail_size = n_rows & 3;
        int n_rows_aligned = n_rows - tail_size;
        int row = 0;
        for (; row < n_rows_aligned; row += 4)
        {
            const auto* col_ptr = val_ptr + row;
            __m128 mm_max = _mm_loadu_ps(col_ptr);
            __m128i mm_max_pos = _mm_setzero_si128();
            __m128i mm_pos = mm_one;
            col_ptr += n_rows;
            for (int col = 1; col < n_cols; ++col)
            {
                __m128 mm_value = _mm_loadu_ps(col_ptr);
    
                // See if this value is greater than the old maximum
                __m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
                // If it is, save its position
                mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));
    
                // Compute the maximum
                mm_max = _mm_max_ps(mm_value, mm_max);
    
                mm_pos = _mm_add_epi32(mm_pos, mm_one);
                col_ptr += n_rows;
            }
    
            // Store the results
            _mm_storeu_ps(max_ptr + row, mm_max);
            _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
        }
    
        // Process tail serially
        for (; row < n_rows; ++row)
        {
            const auto* col_ptr = val_ptr + row;
            auto max = *col_ptr;
            int max_pos = 0;
            col_ptr += n_rows;
            for (int col = 1; col < n_cols; ++col)
            {
                auto value = *col_ptr;
                if (value > max)
                {
                    max = value;
                    max_pos = col;
                }
    
                col_ptr += n_rows;
            }
    
            max_ptr[row] = max;
            opt_pos_ptr[row] = max_pos;
        }
    }
    

    The code above requires SSE4.1 because of the blending intrinsics. You can replace those with a combination of _mm_and_si128/_ps, _mm_andnot_si128/_ps and _mm_or_si128/_ps, in which case the requirements will be lowered to SSE2. See Intel Intrinsics Guide for more details on the particular intrinsics, including which instruction set extensions they require.


    A note about NaN values. If your matrix can have NaNs, the _mm_cmplt_ps test will always return false. As for _mm_max_ps, it is generally not known what it will return. The maxps instruction that the intrinsic translates to returns its second (source) operand if either of the operands is a NaN, so by arranging the operands of the instruction you can achieve either behavior. However, it is not documented which argument of the _mm_max_ps intrinsic represents which operand of the instruction, and it is even possible that the compiler may use different association in different cases. See this answer for more details.

    In order to ensure the correct behavior wrt. NaNs you could use inline assembler to force the correct order of maxps operands. Unfortunately, that is not an option with MSVC for x86-64 target, which you said you're using, so instead you could reuse the _mm_cmplt_ps result for a second blend like this:

    // Compute the maximum
    mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);
    

    This will suppress NaNs in the resulting max values. If you want to keep NaNs instead, you could use a second comparison to detect NaNs:

    // Detect NaNs
    __m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);
    
    // Compute the maximum
    mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));
    

    You could probably further improve performance of the algorithm above if you use wider vectors (__m256 or __m512) and unroll the outer loop by a small factor, so that at least a cache line worth of row data is loaded on every iteration of the inner loop.


    Here is an example of implementation for double. The important point to note here is that because there are only two double elements per vector and there are still four positions per vector, we have to unroll the outer loop to process two vectors of double at a time and then compress the two masks from comparisons with the previous maximums to blend the 32-bit positions.

    void find_max(const int n_cols, 
             const int n_rows, 
             const double* val_ptr,
             int* opt_pos_ptr,
             double* max_ptr){
        const __m128i mm_one = _mm_set1_epi32(1);
    
        // Pre-compute the number of rows that can be processed in full vector width.
        // In a 128-bit vector there are 2 doubles, but we want to process
        // two vectors at a time.
        int tail_size = n_rows & 3;
        int n_rows_aligned = n_rows - tail_size;
        int row = 0;
        for (; row < n_rows_aligned; row += 4)
        {
            const auto* col_ptr = val_ptr + row;
            __m128d mm_max1 = _mm_loadu_pd(col_ptr);
            __m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
            __m128i mm_max_pos = _mm_setzero_si128();
            __m128i mm_pos = mm_one;
            col_ptr += n_rows;
            for (int col = 1; col < n_cols; ++col)
            {
                __m128d mm_value1 = _mm_loadu_pd(col_ptr);
                __m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);
    
                // See if this value is greater than the old maximum
                __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
                __m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
                // Compress the 2 masks into one
                __m128i mm_mask = _mm_packs_epi32(
                    _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
                // If it is, save its position
                mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
    
                // Compute the maximum
                mm_max1 = _mm_max_pd(mm_value1, mm_max1);
                mm_max2 = _mm_max_pd(mm_value2, mm_max2);
    
                mm_pos = _mm_add_epi32(mm_pos, mm_one);
                col_ptr += n_rows;
            }
    
            // Store the results
            _mm_storeu_pd(max_ptr + row, mm_max1);
            _mm_storeu_pd(max_ptr + row + 2, mm_max2);
            _mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
        }
    
        // Process 2 doubles at once
        if (tail_size >= 2)
        {
            const auto* col_ptr = val_ptr + row;
            __m128d mm_max1 = _mm_loadu_pd(col_ptr);
            __m128i mm_max_pos = _mm_setzero_si128();
            __m128i mm_pos = mm_one;
            col_ptr += n_rows;
            for (int col = 1; col < n_cols; ++col)
            {
                __m128d mm_value1 = _mm_loadu_pd(col_ptr);
    
                // See if this value is greater than the old maximum
                __m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
                // Compress the mask. The upper half doesn't matter.
                __m128i mm_mask = _mm_packs_epi32(
                    _mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
                // If it is, save its position
                mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
    
                // Compute the maximum
                mm_max1 = _mm_max_pd(mm_value1, mm_max1);
    
                mm_pos = _mm_add_epi32(mm_pos, mm_one);
                col_ptr += n_rows;
            }
    
            // Store the results
            _mm_storeu_pd(max_ptr + row, mm_max1);
            // Only store the lower two positions
            _mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
    
            row += 2;
        }
    
        // Process tail serially
        for (; row < n_rows; ++row)
        {
            const auto* col_ptr = val_ptr + row;
            auto max = *col_ptr;
            int max_pos = 0;
            col_ptr += n_rows;
            for (int col = 1; col < n_cols; ++col)
            {
                auto value = *col_ptr;
                if (value > max)
                {
                    max = value;
                    max_pos = col;
                }
    
                col_ptr += n_rows;
            }
    
            max_ptr[row] = max;
            opt_pos_ptr[row] = max_pos;
        }
    }