Search code examples
c++micro-optimization

Fastest way to find the row-wise maximum over submatrices


In a performance-critical code, I am given 2 large matrices, (size in the thousands)

expectations, realizations

of the same size but containing different values. These matrices are both partitioned over columns in the same way, with every submatrix has a different number of columns. Something like this

submat1     submat2     submat3
-----------------------------
|...........| .......| .....|
|...........| .......| .....|
|...........| .......| .....|
|...........| .......| .....|
|...........| .......| .....|
-----------------------------

I need the fastest way to fill a third matrix in the following way (in pseudocode)

for each submatrix
    for each row in submatrix
        pos= argmax(expectations(row,start_submatrix(col):end_submatrix(col)))
        result(row,col) =  realization(row,pos)

That is, for each submatrix, I scan every row, find the position of the largest element in the submatrix of expectations, and place the corresponding value of the realization matrix into the result matrix.

I would like to have the fastest way possible to accomplish that, perhaps via smart parallelization/cache optimization, as this function is where I spend something like 40% of my time in the algorithm. I use visual studio 15.9.6 and Windows 10.

This is my reference C++ implementation, where I use Armadillo (column-major) matrices

#include <iostream>
#include <chrono>
#include <vector>

///Trivial implementation, for illustration purposes
void find_max_vertical_trivial(const arma::mat& expectations, const arma::mat& realizations, arma::mat& results, const arma::uvec & list, const int max_size_action)
{
    const int number_columns_results = results.n_cols;
    const int number_rows = expectations.n_rows;
#pragma omp parallel for schedule(static)
    for (int submatrix_to_process = 0; submatrix_to_process < number_columns_results; submatrix_to_process++)
    {
        const int start_loop = submatrix_to_process * max_size_action;
        //Looping over rows
        for (int current_row = 0; current_row < number_rows; current_row++)
        {
            int candidate = start_loop;
            const int end_loop = candidate + list(submatrix_to_process);
            //Finding the optimal action
            for (int act = candidate + 1; act < end_loop; act++)
            {
                if (expectations(current_row, act) > expectations(current_row, candidate))
                    candidate = act;
            }
            //Placing the corresponding realization into the results
            results(current_row, submatrix_to_process) = realizations(current_row, candidate);
        }
    }
}

here is the fastest way I was able to come up with. Is it possible to improve it?

///Stripped all armadillo functionality, to bare C
void find_max_vertical_optimized(const arma::mat& expectations, const arma::mat& realizations, arma::mat& values, const arma::uvec & list, const int max_block)
{
    const int n_columns = values.n_cols;
    const int number_rows = expectations.n_rows;
    const auto exp_ptr = expectations.memptr();
    const auto real_ptr = realizations.memptr();
    const auto values_ptr = values.memptr();
    const auto list_ptr = list.memptr();
#pragma omp parallel for schedule(static)
    for (int col_position = 0; col_position < n_columns; col_position++)
    {
        const int start_loop = col_position * max_block*number_rows;
        const int end_loop = start_loop + list_ptr[col_position]*number_rows;
        const int position_value = col_position * number_rows;
        for (int row_position = 0; row_position < number_rows; row_position++)
        {
            int candidate = start_loop;
            const auto st_exp = exp_ptr + row_position;
            const auto st_real = real_ptr + row_position;
            const auto st_val = values_ptr + row_position;
            for (int new_candidate = candidate + number_rows; new_candidate < end_loop; new_candidate+= number_rows)
            {
                if (st_exp[new_candidate] > st_exp[candidate])
                    candidate = new_candidate;
            }
            st_val[position_value] = st_real[candidate];
        }
    }
}

and the test part, where I compare the performance

typedef std::chrono::microseconds dur;
const double dur2seconds = 1e6;

//Testing the two methods
int main()
{
    const int max_cols_submatrix = 6; //Typical size: 3-100
    const int n_test = 500;
    const int number_rows = 2000;   //typical size: 1000-10000
    std::vector<int> size_to_test = {4,10,40,100,1000,5000 }; //typical size: 10-5000
    arma::vec time_test(n_test, arma::fill::zeros);
    arma::vec time_trivial(n_test, arma::fill::zeros);

    for (const auto &size_grid : size_to_test) {
        arma::mat expectations(number_rows, max_cols_submatrix*size_grid, arma::fill::randn);
        arma::mat realizations(number_rows, max_cols_submatrix*size_grid, arma::fill::randn);
        arma::mat reference_values(number_rows, size_grid, arma::fill::zeros);
        arma::mat optimized_values(number_rows, size_grid, arma::fill::zeros);
        arma::uvec number_columns_per_submatrix(size_grid);
        //Generate random number of columns per each submatrices
        number_columns_per_submatrix= arma::conv_to<arma::uvec>::from(arma::vec(size_grid,arma::fill::randu)*max_cols_submatrix);
        for (int i = 0; i < n_test; i++) {
            auto st_meas = std::chrono::high_resolution_clock::now();
            find_max_vertical_trivial(expectations, realizations, optimized_values, number_columns_per_submatrix, max_cols_submatrix);
            time_trivial(i) = std::chrono::duration_cast<dur>(std::chrono::high_resolution_clock::now() - st_meas).count() / dur2seconds;;
            st_meas = std::chrono::high_resolution_clock::now();
            find_max_vertical_optimized(expectations, realizations, reference_values, number_columns_per_submatrix, max_cols_submatrix);
            time_test(i) = std::chrono::duration_cast<dur>(std::chrono::high_resolution_clock::now() - st_meas).count() / dur2seconds;
            const auto diff = arma::sum(arma::sum(arma::abs(reference_values - optimized_values)));
            if (diff > 1e-3)
            {
                std::cout <<"Error: " <<diff << "\n";
                throw std::runtime_error("Error");
            }
        }
        std::cout <<"grid size:"<< size_grid << "\n";
        const double mean_time_trivial = arma::mean(time_trivial);
        const double mean_time_opt = arma::mean(time_test);

        std::cout << "Trivial: "<< mean_time_trivial << " s +/-" << 1.95*arma::stddev(time_trivial) / sqrt(n_test) <<"\n";
        std::cout << "Optimized: "<< mean_time_opt <<" s ("<< (mean_time_opt/ mean_time_trivial-1)*100.0 <<" %) "<<"+/-" << 1.95*arma::stddev(time_test) / sqrt(n_test)  << "\n";
    }
}

Solution

  • You can probably optimize for cache with a SIMD loop that reads maybe 8 or 12 full vectors of rows, then the same rows for the next column. (So for 32-bit elements, 8*4 or 8*8 rows in parallel). You're using MSVC which supports x86 SSE2 / AVX2 intrinsics like _mm256_load_ps and _mm256_max_ps, or _mm256_max_epi32.

    If you start at an alignment boundary, then hopefully you read all of each cache line you touch at all. And then the same access pattern in the output. (So you're reading 2 to 6 contiguous cache lines, with a stride between blocks of reads / writes.)

    Or possibly record tmp results somewhere compact (1 value per segment per row) before blowing away more cache writing copies of the same element to every column. But try both ways; mixing reads and writes might let the CPU better overlap work and find more memory-level parallelism.