Search code examples
performancec++11boostbinary-search-treeheap

How to optimize boost heaps to outperform std multiset in heap operations? C++


I have worked on implementing the fast marching method. It's a computational method to solve a special type of differential equation. In particular, this code solves the equation $$|\nabla phi| = 1$$ for $$\phi$$ given a boundary condition where $$\phi = 0$$. Anyways, to achieve a O(n lg n) running time requires the use of a priority queue supporting get_min(), extract_min(), decrease_key() and insert() (or push()). Moreover, the algorithm in question will use n insert() operations, n extract_min() operations and at worst 4n decrease_key() operations. Now, it would seem to me that heap such as the fibonacci_heap in the boost library would outperform an std set by a lot which supports the same operations (decrease key is implemented by erasing the element and readding it). However, this is not the case and I was wondering why?

(I would like to note that it is not possible to use the std priority_queue, as it does not support decrease_key())

Here is the code using a d-ary-heap:

#include <vector> 
#include <limits>
#include <set>
#include <tuple>
#include <iostream>
#include <cmath>
#include <cassert>
#include <boost/heap/d_ary_heap.hpp>

using namespace boost::heap;

// Define epsilon value
#define EPS 0.0000000001

using namespace boost::heap;

struct treedist {
    double d;
    int row;
    int col;
    int ts;
    int known;
    bool operator<(const treedist& rhs) const
    {
        if (d == rhs.d) {
            if (known == rhs.known) {
                return ts > rhs.ts;
            }
            return known < rhs.known;
        }
        return d > rhs.d;
    }
};

struct fmmdist {
    int state;
    double d = -1;
    typename d_ary_heap<treedist,mutable_<true>,arity<2> >::handle_type it;
};

// Matrix representing state of point in fast marching method
std::vector<std::vector <fmmdist> > V;

// Binary tree used to efficiently store the distances
d_ary_heap<treedist,mutable_<true>,arity<2> > distances;

// phi array
std::vector<std::vector<double> > phi;

// Size of grid
int rows; int columns;

// Spatial step
double ddx;

const int dx[4] = {0,1,0,-1};
const int dy[4] = {1,0,-1,0};

// initialize phi array for testing purposes
void initPhi() {
    ddx = 0.001;
    rows = 16001;
    columns = 16001;
    for (int i = 0; i < rows; i++ ) {
        std::vector<double> temp;
        phi.push_back(temp);
        for (int j = 0; j < columns; j++) {
            phi[i].push_back(sqrt(powf(-8 + ddx*i,2) + powf(-8 + ddx*j,2)) - 4);
        }
    }
}

// Initialize the state array
void initState() {
    // 0 means far, 1 means close, 2 means known
    for (int i = 0; i < rows; i++) {
        std::vector<fmmdist> temp;
        V.push_back(temp);
        for (int j = 0; j < columns; j++) {
            struct fmmdist f;
            f.state = 0;
            V[i].push_back(f);
        }
    }
}

// return largest root of quadratic with coef. a, b, c
// This assumes delta >= 0 (this holds for FMM method)
double quadratic(double a, double b, double c) {
    double delta = b * b - 4 * a * c;
    if ( delta > 0 ) {
        double r1 = (-b + std::sqrtf(delta))/(2*a);
        double r2 = (-b - std::sqrtf(delta))/(2*a);
        if (r1 > r2) {
            return r1;
        } 
        return r2;
    }
    return -b/(2*a);
}

// Initialization of the fast marching method
void initialization(bool inside) {
    // 0 means far, 1 means close, 2 means known
    // inside means we tag the points within the interface as known
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (inside) {
                if (phi[i][j] < EPS) { // bound used to be used here...
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                } 
            } else {
                if (phi[i][j] > -EPS) {
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                }
            }
        }
    }
    // find initial close points
    // see paper by sethian
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (V[i][j].state != 2) {
                int neigh = 0; // mod 16 this represents a specific configuration
                double dist = 0;
                int known = 0;
                std::vector<double> ndist;
                for (int k = 0; k < 4; k++) {
                    if (i + dx[k] >= rows || i + dx[k] < 0 || j + dy[k] >= columns || j + dy[k] < 0) continue;
                    if (V[i + dx[k]][j + dy[k]].state == 2) {
                        neigh += pow(2,k);
                        known++;
                        ndist.push_back(abs(phi[i][j]) * ddx/(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])) );
                        
                    } 
                }
                if (ndist.size() == 1) {
                    // case a
                    dist = ndist[0];
                } else if (neigh == 3 || neigh == 12 || neigh == 6 || neigh == 9) {
                    // case b
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * ndist[1] * ndist[1])/(ndist[0] * ndist[0] + ndist[1] * ndist[1]));
                } else if (neigh == 5 || neigh == 10) {
                    // case d
                    dist = fmin(ndist[0],ndist[1]);
                } else if (neigh == 13) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[1]);
                    dist = quadratic(2,0,-(ndist[2] * ndist[2] * v * v)/(ndist[2] * ndist[2] + v*v));
                } else if (neigh == 7) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 11) {
                    // case c, both horizontal
                    double v = fmin(ndist[1],ndist[2]);
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * v * v)/(ndist[0] * ndist[0] + v*v));
                } else if (neigh == 14) {
                    // case c, both horizontal
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 15) {
                    // case e
                    double v = fmin(ndist[0],ndist[2]);
                    double h = fmin(ndist[1],ndist[3]);
                    dist = quadratic(2,0,-(h * h * v * v)/(h * h + v*v));
                }
                if (neigh > 0) {
                    // add to narrow band
                    struct treedist t;
                    t.d = dist; t.row = i; t.col = j; t.ts = 0; t.known = known;
                    V[i][j].state = 1;
                    V[i][j].d = dist;
                    V[i][j].it = distances.push(t); 
                }
            }
        }
    }
}

// Determine x coefficients of backward/forward difference 
std::tuple<double, double, double> rowCoef(int row, int column, bool backward) {
    int row1; 
    if (backward) {
        row1 = row - 1; 
    } else {
        row1 = row + 1; 
    }
    return std::make_tuple(1.0, -2.0 * V[row1][column].d, V[row1][column].d * V[row1][column].d );
}

// Determine y coefficients of backward/forward difference 
std::tuple<double, double, double> columnCoef(int row, int column, bool backward) {
    int col1;
    if (backward) {
        col1 = column - 1; 
    } else {
        col1 = column + 1; 
    }
    return std::make_tuple(1.0, -2.0 * V[row][col1].d, V[row][col1].d * V[row][col1].d );       
}

double computeDist(int row, int column) {
    // Determine the x-coefficients
    std::tuple<double, double, double> coefx = std::make_tuple(0,0,0);
    if (row + 1 < rows && row - 1 >= 0 ) {
        if(V[row + 1][column].state == 2 && V[row - 1][column].state == 2) coefx = rowCoef(row, column, V[row - 1][column].d < V[row + 1][column].d);
        else if (V[row + 1][column].state == 2 ) coefx = rowCoef(row, column, false);
        else if (V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    } else if ( row + 1 < rows) {
        if(V[row + 1][column].state == 2) coefx = rowCoef(row, column, false);
    } else if ( row - 1 >= 0) {
        if(V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    }
    // Determine the y-coefficients
    std::tuple<double, double, double> coefy = std::make_tuple(0,0,0);
    if (column + 1 < columns && column - 1 >= 0) {
        if (V[row][column + 1].state == 2 && V[row][column - 1].state == 2) coefy = columnCoef(row, column, V[row][column - 1].d < V[row][column + 1].d);
        else if (V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
        else if (V[row][column - 1].state == 2 ) coefy = columnCoef(row, column, true);
    } else if ( column + 1 < columns) {
        if(V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
    } else if ( column - 1 >= 0){
        if(V[row][column - 1].state == 2) coefy = columnCoef(row, column, true);
    } 

    // return the largest root of the quadratic
    double a = std::get<0>(coefx) + std::get<0>(coefy);
    double b = std::get<1>(coefx) + std::get<1>(coefy);
    double c = std::get<2>(coefx) + std::get<2>(coefy) - powf(ddx,2);
    double result = quadratic(a,b,c);
    assert(!isnan(result));
    return result;
}

// Fast marching loop including simultaneous velocity extension
void loopFMVel() {
    int count = 1;
    while(distances.size() > 0) {
        // extract closest
        struct treedist temp = distances.top();
        int row = temp.row; int column = temp.col;
        distances.pop();
        V[row][column].state = 2;
        for (int k = 0; k < 4; k++) {
            if (row + dx[k] >= rows || row + dx[k] < 0 || column + dy[k] >= columns || column + dy[k] < 0) continue;
            if (V[row + dx[k]][column + dy[k]].state == 2) continue;
            double d;
            if (V[row + dx[k]][column + dy[k]].state == 1) {
                d = computeDist(row + dx[k], column + dy[k]);
                if (d < (*V[row + dx[k]][column + dy[k]].it).d) {
                    (*V[row + dx[k]][column + dy[k]].it).d = d;
                    (*V[row + dx[k]][column + dy[k]].it).known = (*V[row + dx[k]][column + dy[k]].it).known + 1;
                    V[row + dx[k]][column + dy[k]].d = d;
                    distances.increase(V[row + dx[k]][column + dy[k]].it);
                }
            } else if (V[row + dx[k]][column + dy[k]].state == 0) {
                d = computeDist(row + dx[k], column + dy[k]);
                struct treedist t;
                t.d = d; t.row = row + dx[k]; t.col = column + dy[k]; t.ts = count; t.known = 1;
                V[row + dx[k]][column + dy[k]].state = 1;
                V[row + dx[k]][column + dy[k]].d = d;
                V[row + dx[k]][column + dy[k]].it = distances.push(t);
            }
        }
        count++;
    }
}

int main() {
    distances.reserve(pow(2056,2)); 
    initPhi();
    initState();
    std::cout << "Finished state init " << std::endl;
    initialization(true);
    std::cout << "Finished init of FMM " << std::endl;
    loopFMVel();
    return 0;
}

Here is code using a multiset:

#include <vector> 
#include <limits>
#include <set>
#include <tuple>
#include <iostream>
#include <cmath>
#include <cassert>

// Define epsilon value
#define EPS 0.0000000001

struct treedist {
    double d;
    int row;
    int col;
    int ts;
    int known;
    bool operator<(const treedist& rhs) const
    {
        if (d == rhs.d) {
            if (known == rhs.known) {
                return ts < rhs.ts;
            }
            return known > rhs.known;
        }
        return d < rhs.d;
    }
};

struct fmmdist {
    int state;
    double d = -1;
    std::multiset<treedist>::iterator it;
};

// Matrix representing state of point in fast marching method
std::vector<std::vector <fmmdist> > V;

// Binary tree used to efficiently store the distances
std::multiset<treedist> distances;

// phi array
std::vector<std::vector<double> > phi;

// Size of grid
int rows; int columns;

// Spatial step
double ddx;

const int dx[4] = {0,1,0,-1};
const int dy[4] = {1,0,-1,0};

// initialize phi array for testing purposes
void initPhi() {
    ddx = 0.001;
    rows = 16001;
    columns = 16001;
    for (int i = 0; i < rows; i++ ) {
        std::vector<double> temp;
        phi.push_back(temp);
        for (int j = 0; j < columns; j++) {
            phi[i].push_back(sqrt(powf(-8 + ddx*i,2) + powf(-8 + ddx*j,2)) - 4);
        }
    }
}

// Initialize the state array
void initState() {
    // 0 means far, 1 means close, 2 means known
    for (int i = 0; i < rows; i++) {
        std::vector<fmmdist> temp;
        V.push_back(temp);
        for (int j = 0; j < columns; j++) {
            struct fmmdist f;
            f.state = 0;
            V[i].push_back(f);
        }
    }
}

// return largest root of quadratic with coef. a, b, c
double quadratic(double a, double b, double c) {
    double delta = b * b - 4 * a * c;
    if ( delta > 0 ) {
        double r1 = (-b + std::sqrtf(delta))/(2*a);
        double r2 = (-b - std::sqrtf(delta))/(2*a);
        if (r1 > r2) {
            return r1;
        } 
        return r2;
    }
    return -b/(2*a);
}

// Initialization of the fast marching method
void initialization(bool inside) {
    // 0 means far, 1 means close, 2 means known
    // inside means we tag the points within the interface as known
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (inside) {
                if (phi[i][j] < EPS) { // bound used to be used here...
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                } 
            } else {
                if (phi[i][j] > -EPS) {
                    V[i][j].state = 2;
                    V[i][j].d = 0;
                }
            }
        }
    }
    // find initial close points
    // see paper by sethian
    for (int i = 0; i < rows; i++) {
        for (int j = 0; j < columns; j++) {
            if (V[i][j].state != 2) {
                int neigh = 0; // mod 16 this represents a specific configuration
                double dist = 0;
                int known = 0;
                std::vector<double> ndist;
                for (int k = 0; k < 4; k++) {
                    if (i + dx[k] >= rows || i + dx[k] < 0 || j + dy[k] >= columns || j + dy[k] < 0) continue;
                    if (V[i + dx[k]][j + dy[k]].state == 2) {
                        neigh += pow(2,k);
                        known++;
                        ndist.push_back(abs(phi[i][j]) * ddx/(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])) );
                        
                    } 
                }
                if (ndist.size() == 1) {
                    // case a
                    dist = ndist[0];
                } else if (neigh == 3 || neigh == 12 || neigh == 6 || neigh == 9) {
                    // case b
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * ndist[1] * ndist[1])/(ndist[0] * ndist[0] + ndist[1] * ndist[1]));
                } else if (neigh == 5 || neigh == 10) {
                    // case d
                    dist = fmin(ndist[0],ndist[1]);
                } else if (neigh == 13) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[1]);
                    dist = quadratic(2,0,-(ndist[2] * ndist[2] * v * v)/(ndist[2] * ndist[2] + v*v));
                } else if (neigh == 7) {
                    // case c, both vertical
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 11) {
                    // case c, both horizontal
                    double v = fmin(ndist[1],ndist[2]);
                    dist = quadratic(2,0,-(ndist[0] * ndist[0] * v * v)/(ndist[0] * ndist[0] + v*v));
                } else if (neigh == 14) {
                    // case c, both horizontal
                    double v = fmin(ndist[0],ndist[2]);
                    dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
                } else if (neigh == 15) {
                    // case e
                    double v = fmin(ndist[0],ndist[2]);
                    double h = fmin(ndist[1],ndist[3]);
                    dist = quadratic(2,0,-(h * h * v * v)/(h * h + v*v));
                }
                if (neigh > 0) {
                    // add to narrow band
                    struct treedist t;
                    t.d = dist; t.row = i; t.col = j; t.ts = 0; t.known = known;
                    V[i][j].state = 1;
                    V[i][j].d = dist;
                    V[i][j].it = distances.insert(t); 
                }
            }
        }
    }
}

// Determine x coefficients of backward/forward difference 
std::tuple<double, double, double> rowCoef(int row, int column, bool backward) {
    int row1; 
    if (backward) {
        row1 = row - 1; 
    } else {
        row1 = row + 1;
    }
    return std::make_tuple(1.0, -2.0 * V[row1][column].d, V[row1][column].d * V[row1][column].d );
}

// Determine y coefficients of backward/forward difference 
std::tuple<double, double, double> columnCoef(int row, int column, bool backward) {
    int col1; 
    if (backward) {
        col1 = column - 1; 
    } else {
        col1 = column + 1; 
    }
    return std::make_tuple(1.0, -2.0 * V[row][col1].d, V[row][col1].d * V[row][col1].d );       
}


double computeDist(int row, int column) {
    // Determine the x-coefficients
    std::tuple<double, double, double> coefx = std::make_tuple(0,0,0);
    if (row + 1 < rows && row - 1 >= 0 ) {
        if(V[row + 1][column].state == 2 && V[row - 1][column].state == 2) coefx = rowCoef(row, column, V[row - 1][column].d < V[row + 1][column].d);
        else if (V[row + 1][column].state == 2 ) coefx = rowCoef(row, column, false);
        else if (V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    } else if ( row + 1 < rows) {
        if(V[row + 1][column].state == 2) coefx = rowCoef(row, column, false);
    } else if ( row - 1 >= 0) {
        if(V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
    }
    // Determine the y-coefficients
    std::tuple<double, double, double> coefy = std::make_tuple(0,0,0);
    if (column + 1 < columns && column - 1 >= 0) {
        if (V[row][column + 1].state == 2 && V[row][column - 1].state == 2) coefy = columnCoef(row, column, V[row][column - 1].d < V[row][column + 1].d);
        else if (V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
        else if (V[row][column - 1].state == 2 ) coefy = columnCoef(row, column, true);
    } else if ( column + 1 < columns) {
        if(V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
    } else if ( column - 1 >= 0){
        if(V[row][column - 1].state == 2) coefy = columnCoef(row, column, true);
    } 
    // return the largest root of the quadratic
    double a = std::get<0>(coefx) + std::get<0>(coefy);
    double b = std::get<1>(coefx) + std::get<1>(coefy);
    double c = std::get<2>(coefx) + std::get<2>(coefy) - powf(ddx,2);

    double result = quadratic(a,b,c);
    return result;
}

// Fast marching loop i
void loopFMVel() {
    int count = 1;
    while(distances.size() > 0) {
        // extract closest
        std::multiset<treedist>::iterator it = distances.begin();
        int row = (*it).row; int column = (*it).col;
        distances.erase(it);
        V[row][column].state = 2;
        for (int k = 0; k < 4; k++) {
            if (row + dx[k] >= rows || row + dx[k] < 0 || column + dy[k] >= columns || column + dy[k] < 0) continue;
            if (V[row + dx[k]][column + dy[k]].state == 2) continue;
            double d;
            if (V[row + dx[k]][column + dy[k]].state == 1) {
                d = computeDist(row + dx[k], column + dy[k]);
                struct treedist t;
                t.d = (*V[row + dx[k]][column + dy[k]].it).d; t.known = (*V[row + dx[k]][column + dy[k]].it).known;
                t.known++;
                t.row = row + dx[k]; t.col = column + dy[k]; t.ts = (*V[row + dx[k]][column + dy[k]].it).ts;
                if (d < t.d) {
                    t.d = d;
                    V[row + dx[k]][column + dy[k]].d = d;
                }
                distances.erase(V[row + dx[k]][column + dy[k]].it);
                V[row + dx[k]][column + dy[k]].it = distances.insert(t);
            } else if (V[row + dx[k]][column + dy[k]].state == 0) {
                d = computeDist(row + dx[k], column + dy[k]);
                struct treedist t;
                t.d = d; t.row = row + dx[k]; t.col = column + dy[k]; t.ts = count; t.known = 1;
                V[row + dx[k]][column + dy[k]].state = 1;
                V[row + dx[k]][column + dy[k]].d = d;
                V[row + dx[k]][column + dy[k]].it = distances.insert(t);
            }
        }
        count++;
    }
    
    std::cout << "Finished with size " << distances.size() << std::endl;
}

int main() {
    initPhi();
    initState();
    std::cout << "Finished state init " << std::endl;
    initialization(true);
    std::cout << "Finished init of FMM " << std::endl;
    loopFMVel();
    return 0;
}

The following are some results from my testing:

N = 16001^2 (I ran these tests on an M1 max with flags -Ofast -fno-finite-math-only -march=armv8.5-a -mcpu=native -ffast-math) binary_heap (with reserved memory): 58.35s multiset: 63.33s fibonacci_heap: 73.43 16_ary_heap (with reserved memory): 65.40s pairing_heap: 116.5s

I profiled using time. If any more details are required, I will gladly add them.


Solution

  • I spent an inordinate amount of time refactoring the code so I could actually understand it.

    Style/Performance

    While reading through I made some changes that

    • avoid repeated code or subexpressions (which are very error-prone especially in copy-pasted stuff)

    • tried to find "meaningful" names.

    • removed magic constants (if you can comment them, you can use an enum)

    • avoid dynamic allocation and incremental insertions. Unless you define USE_VECTOR my version uses std::array to allocate everything up-front:

       #ifndef USE_VECTOR
           template <typename T, size_t C> using Row              = std::array<T, C>;
           template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
       #else
           template <typename T, size_t R, size_t C> struct Matrix {
               std::vector<std::vector<T>> impl_;
               Matrix() : impl_(R, std::vector<T>(C, T{})) {}
      
               auto& operator[](size_t n) const { return impl_[n]; }
               auto& operator[](size_t n)       { return impl_[n]; }
           };
       #endif
      

      Which is used for both the phi and V:

       // phi array
       Matrix<double, rows, columns> phi;
      
       // Matrix representing state of point in fast marching method
       Matrix<fmmdist, rows, columns> V;
      
    • avoid C-isms like sqrtf, prefer std::min over manual compare and swap

    • I rewrote the branches in initialization to always use the same path for the quadratic calculation:

       double dist = pick_distance(ndist, neigh);
      

      Which is implemented (also using a static_vector<double, 4> for the max 4 neighboring distances):

       using NDist = boost::container::static_vector<double, 4>;
      
       static double pick_distance(NDist const& ndist, unsigned configuration)  {
           if (ndist.size() == 1)
               return ndist[0]; // case a
      
           auto q_hv = [](double h, double v) {
               return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
           };
      
           switch (configuration) {
               case 3: case 12:
               case 6: case 9:  return q_hv(ndist[0], ndist[1]);                     // case b
               case 5: case 10: return std::min(ndist[0], ndist[1]);                 // case d
               case 13:         return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
               case 7:          return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
               case 11:         return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
               case 14:         return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
               case 15:         return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
               default:         return 0;
           }
       };
      
    • adding the bits to the neigh bitmask (which is a clever trick, by the way) can be done safer and more effectively:

       neigh |= (1 << k); // instead of neigh += pow(2,k);
      
    • the various variables known and ndist all have pretty redundant state, adhering to the following invariants:

       assert(known >= 0);
       assert(std::popcount(neigh) == known);
       assert(known == static_cast<int>(ndist.size()));
       assert((neigh != 0) == (known != 0));
       assert((neigh == 0) == ndist.empty());
      

      I'd suggest dropping some of the unused values. With these, and extracting the repeated sub-expressions, the initialization function becomes much more readable:

       // Initialization of the fast marching method
       void initialization(bool inside) {
           // inside means we tag the points within the interface as known
           for (int i = 0; i < rows; i++)
               for (int j = 0; j < columns; j++)
                   if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
                       V[i][j].state = fmmdist::known;
                       V[i][j].dist  = 0;
                   }
      
           // find initial close points
           // see paper by James Sethian
           NDist ndist;
           for (int i = 0; i < rows; i++) {
               for (int j = 0; j < columns; j++) {
                   if (V[i][j].state != fmmdist::known) {
                       unsigned neigh = 0; // mod 16 this represents a specific configuration
      
                       ndist.clear();
                       for (unsigned k = 0; k < dx.size(); k++) {
                           auto const r = i + dx[k];
                           auto const c = j + dy[k];
                           if (r >= rows || r < 0 || c >= columns || c < 0)
                               continue;
      
                           if (V[r][c].state == fmmdist::known) {
                               neigh |= (1 << k);
      
                               using std::abs;
                               ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
                           }
                       }
      
                       if (neigh) {
                           double dist = pick_distance(ndist, neigh);
      
                           // add to narrow band
                           treedist t;
                           t.dist  = dist;
                           t.coord = {i, j};
                           t.ts    = 0;
                           t.known = ndist.size();
                           V[i][j] = {fmmdist::close, dist, queue.push(t)};
                       }
                   }
               }
           }
       }
      
    • Similar treatment of the loopFMVel() function has a similar effect:

       // Fast marching loop including simultaneous velocity extension
       void loopFMVel() {
           Timestamp tsclock = 1;
      
           while (!queue.empty()) {
               auto [row, column]   = queue.pop();
               V[row][column].state = fmmdist::known;
      
               for (unsigned k = 0; k < dx.size(); k++) {
                   auto const r = row + dx[k];
                   auto const c = column + dy[k];
                   if (r >= rows || r < 0 || c >= columns || c < 0)
                       continue;
      
                   auto& cell = V[r][c]; // TODO better name
                   if (cell.state == fmmdist::known)
                       continue;
      
                   auto dist = computeDist(r, c);
                   if (cell.state == fmmdist::close) {
                       if (auto& old = *cell.handle; dist < old.dist) {
                           cell.dist = dist;
                           queue.increase(cell.handle, dist);
                       }
                   } else if (cell.state == fmmdist::far) {
                       treedist temp;
                       temp.coord = {r, c};
                       temp.dist  = dist;
                       temp.known = 1;
                       temp.ts    = tsclock;
      
                       cell.state  = fmmdist::close;
                       cell.dist   = dist;
                       cell.handle = queue.push(temp);
                   }
               }
               tsclock += 1;
           }
       }
      

    Issues

    Along the way I spotted some potential issues:

    1. Accidentally truncating floating point numbers to integers

      Did you mean to use int ::abs(int) in line 129? Fix it using std::abs to get double std::abs(double):

      using std::abs;
      ndist.push_back(
          abs(phi[i][j]) * ddx /
          (abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])));
      
    2. Inconsistent computeDist. After all my review the function now reads as:

      struct Coef {
          double      a, b, c;
          static Coef make(double d) { return {1.0, -2.0 * d, d * d}; };
      };
      
      double computeDist(int row, int column) {
          // Determine the x-coefficients
          auto closest = [](fmmdist const* next, fmmdist const* prev) {
              if (next && next->state != fmmdist::known) next = nullptr;
              if (prev && prev->state != fmmdist::known) prev = nullptr;
      
              if (!(next || prev))
                  return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
      
              double d = std::numeric_limits<double>::infinity();
              if (next) d = std::min(d, next->dist);
              if (prev) d = std::min(d, prev->dist);
              return Coef::make(d);
          };
      
          Coef x = closest(                                   //
              row + 1 < rows ? &V[row + 1][column] : nullptr, //
              row > 0        ? &V[row - 1][column] : nullptr);
          Coef y = closest(                                         //
              column + 1 < columns ? &V[row][column + 1] : nullptr, //
              column > 0           ? &V[row][column - 1] : nullptr);
      
          // return the largest root of the quadratic
          double a = x.a + y.a;
          double b = x.b + y.b;
          double c = x.c + y.c - powf(ddx, 2);
      
          double result = quadratic(a, b, c);
          assert(!std::isnan(result));
          return result;
      }
      

      In this form the distinction between rowCoef and columnCoef went away. And a potential problem is highlighted:

      if (!(next || prev))
          return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
      

      If no neighbouring row/column is accessible and in the known, we're defaulting to suddenly different coefficents (0, 0, 0), where I'd probably have expected (2, -Inf, Inf) for consistency. If you agree, we can remove the special-case making the code even more elegant:

      auto closest = [](auto*... candidates) {
          double dist = INFINITY;
      
          for (auto p : {candidates...})
              if (p && p->state == fmmdist::known)
                  dist = std::min(dist, p->dist);
      
          return Coef{1.0, -2.0 * dist, dist * dist};
      };
      
    3. Comparing your versions, I note that the multiset variant behaves differently: it always increases known regardless of whether the distance is decreased.

      enter image description here

      As you've seen above, I assumed that you only want to update like in your heap implementation, to make it a fair comparison.

    Queue Implementations

    I refactored the code so I could switch the queue implementation with the same FMM implementation:

    struct QueueConcept {
        using Handle = /*stable handle type*/;
    
        size_t size()  const;
        bool   empty() const;
    
        Coord pop(); // extract closest
        Handle push(treedist v);
    
        void increase(Handle& h, double dist);
    };
    

    Which is trivially implemented for both multiset and d_ary_heap.

    struct DistanceTree {
        using Storage = std::multiset<treedist, treedist::Less>;
        using Handle  = typename Storage::const_iterator; // must be stable
    
        size_t size()  const { return _storage.size();  }
        bool   empty() const { return _storage.empty(); }
    
        Coord pop() { // extract closest
            Handle handle = _storage.begin();
            Coord  coord  = handle->coord;
            _storage.erase(handle);
            return coord;
        }
    
        Handle push(treedist v) { return _storage.insert(std::move(v)); }
    
        void increase(Handle& h, double dist) {
            treedist v = *h;
            _storage.erase(h);
    
            assert(dist <= v.dist);
            v.dist = dist;
            v.known += 1;
    
            h = push(std::move(v));
        }
    
      private:
        Storage _storage;
    };
    
    struct DistanceHeap {
        using Storage = bh::d_ary_heap<treedist, //
                                       bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
        using Handle  = Storage::handle_type;
    
        DistanceHeap() { _storage.reserve(2056 * 2056); }
    
        size_t size()  const { return _storage.size();  }
        bool   empty() const { return _storage.empty(); }
    
        Coord pop() { // extract closest
            auto& temp  = _storage.top();
            Coord  coord = temp.coord;
            _storage.pop();
            return coord;
        }
    
        Handle push(treedist v) { return _storage.push(std::move(v)); }
    
        void increase(Handle h, double dist) {
            auto& v = *h;
            assert(dist <= v.dist);
            v.dist = dist;
            v.known += 1;
            _storage.increase(h);
        }
    
      private:
        Storage _storage;
    };
    

    Now I templated the Algorithm code like so:

    template <typename Queue, int rows = 16001, int columns = rows> //
    struct Algorithm {
        Queue queue; // prio queue used to efficiently store the distances
    
        // .. the rest
    };
    

    And added a test-bed:

    #include <chrono>
    #include <iomanip>
    namespace {
        static long elapsed() {
            auto now = std::chrono::high_resolution_clock::now;
            using namespace std::chrono_literals;
            static auto start = now();
            auto        t     = now();
    
            return (t - std::exchange(start, t)) / 1ms;
        }
    
        void trace(auto const&... args) {
            ((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
        }
    
        template <typename Queue> void run() {
            trace(__PRETTY_FUNCTION__);
    
            auto a = std::make_unique<Algorithm<Queue, 5601>>();
            trace("Constructed");
            a->initPhi();
            a->initState();
            trace("Finished state init ");
            a->initialization(true);
            trace("Finished init of FMM ");
            a->loopFMVel();
            trace("Done");
        }
    } // namespace
    
    int main(int argc, char**) {
        std::ios::sync_with_stdio(false);
        trace("Start");
        if (argc == 1) run<DistanceTree>();
        if (argc >= 2) run<DistanceHeap>();
        if (argc == 3) run<DistanceTree>();
        trace("Exit");
    }
    

    Live Demo

    Live On Coliru

    #include <array>
    #include <cassert>
    #include <cmath>
    #include <limits>
    #include <memory>
    #include <set>
    #include <tuple>
    #include <utility>
    #include <vector>
    
    #include <boost/container/static_vector.hpp>
    #include <boost/heap/d_ary_heap.hpp>
    namespace bh = boost::heap;
    
    // Define epsilon value
    static inline constexpr double EPS = 1e-10;
    
    #ifndef USE_VECTOR
        template <typename T, size_t C> using Row              = std::array<T, C>;
        template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
    #else
        template <typename T, size_t R, size_t C> struct Matrix {
            std::vector<std::vector<T>> impl_;
            Matrix() : impl_(R, std::vector<T>(C, T{})) {}
    
            auto& operator[](size_t n) const { return impl_[n]; }
            auto& operator[](size_t n)       { return impl_[n]; }
        };
    #endif
    
    using Timestamp = unsigned long;
    
    struct Coord { int row, col; };
    
    struct treedist {
        double    dist;
        int       known;
        Timestamp ts;
        Coord     coord;
    
      private:
        constexpr auto key() const { // natural order by nearest, greatest known and earliest ts
            return std::tuple(dist, -known, ts);
        }
    
      public:
        struct Less {
            constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() < b.key(); }
        };
        struct Greater {
            constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() > b.key(); }
        };
    };
    
    struct DistanceTree {
        using Storage = std::multiset<treedist, treedist::Less>;
        using Handle  = typename Storage::const_iterator; // must be stable
    
        size_t size()  const { return _storage.size();  }
        bool   empty() const { return _storage.empty(); }
    
        Coord pop() { // extract closest
            Handle handle = _storage.begin();
            Coord  coord  = handle->coord;
            _storage.erase(handle);
            return coord;
        }
    
        Handle push(treedist v) { return _storage.insert(std::move(v)); }
    
        void increase(Handle& h, double dist) {
            treedist v = *h;
            _storage.erase(h);
    
            assert(dist <= v.dist);
            v.dist = dist;
            v.known += 1;
    
            h = push(std::move(v));
        }
    
      private:
        Storage _storage;
    };
    
    struct DistanceHeap {
        using Storage = bh::d_ary_heap<treedist, //
                                       bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
        using Handle  = Storage::handle_type;
    
        DistanceHeap() { _storage.reserve(2056 * 2056); }
    
        size_t size()  const { return _storage.size();  }
        bool   empty() const { return _storage.empty(); }
    
        Coord pop() { // extract closest
            auto& temp  = _storage.top();
            Coord  coord = temp.coord;
            _storage.pop();
            return coord;
        }
    
        Handle push(treedist v) { return _storage.push(std::move(v)); }
    
        void increase(Handle h, double dist) {
            auto& v = *h;
            assert(dist <= v.dist);
            v.dist = dist;
            v.known += 1;
            _storage.increase(h);
        }
    
      private:
        Storage _storage;
    };
    
    template <typename Queue, int rows = 16001, int columns = rows> //
    struct Algorithm {
        Queue queue; // prio queue used to efficiently store the distances
    
        struct fmmdist {
            enum State { far = 0, close = 1, known = 2 };
            State                  state{far};
            double                 dist{-1};
            typename Queue::Handle handle{};
        };
    
        // phi array
        Matrix<double, rows, columns> phi;
    
        // Matrix representing state of point in fast marching method
        Matrix<fmmdist, rows, columns> V;
    
        // Spatial step
        static double constexpr ddx = 1e-3;
    
        static constexpr std::array<int, 4> dx{0, 1, 0, -1};
        static constexpr std::array<int, 4> dy{1, 0, -1, 0};
    
        // initialize phi array for testing purposes
        inline void initPhi() {
            for (int i = 0; i < rows; i++)
                for (int j = 0; j < columns; j++)
                    phi[i][j] = sqrt(pow(-8 + ddx * i, 2) + pow(-8 + ddx * j, 2)) - 4;
        }
    
        // Initialize the state array
        void initState() {
            for (int i = 0; i < rows; i++)
                for (int j = 0; j < columns; j++)
                    V[i][j] = {fmmdist::far, -1, {}};
        }
    
        // return largest root of quadratic with coef. a, b, c
        // This assumes delta >= 0 (this holds for FMM method)
        static double quadratic(double a, double b, double c) {
            double delta = b * b - 4 * a * c;
            if (delta > 0) {
                double r1 = (-b + std::sqrt(delta)) / (2 * a);
                double r2 = (-b - std::sqrt(delta)) / (2 * a);
                return std::max(r1, r2);
            }
            return -b / (2 * a);
        }
    
        using NDist = boost::container::static_vector<double, 4>;
    
        static double pick_distance(NDist const& ndist, unsigned configuration)  {
            if (ndist.size() == 1)
                return ndist[0]; // case a
    
            auto q_hv = [](double h, double v) {
                return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
            };
    
            switch (configuration) {
                case 3: case 12:
                case 6: case 9:  return q_hv(ndist[0], ndist[1]);                     // case b
                case 5: case 10: return std::min(ndist[0], ndist[1]);                 // case d
                case 13:         return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
                case 7:          return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
                case 11:         return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
                case 14:         return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
                case 15:         return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
                default:         return 0;
            }
        };
    
        // Initialization of the fast marching method
        void initialization(bool inside) {
            // inside means we tag the points within the interface as known
            for (int i = 0; i < rows; i++)
                for (int j = 0; j < columns; j++)
                    if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
                        V[i][j].state = fmmdist::known;
                        V[i][j].dist  = 0;
                    }
    
            // find initial close points
            // see paper by James Sethian
            NDist ndist;
            for (int i = 0; i < rows; i++) {
                for (int j = 0; j < columns; j++) {
                    if (V[i][j].state != fmmdist::known) {
                        unsigned neigh = 0; // mod 16 this represents a specific configuration
    
                        ndist.clear();
                        for (unsigned k = 0; k < dx.size(); k++) {
                            auto const r = i + dx[k];
                            auto const c = j + dy[k];
                            if (r >= rows || r < 0 || c >= columns || c < 0)
                                continue;
    
                            if (V[r][c].state == fmmdist::known) {
                                neigh |= (1 << k);
    
                                using std::abs;
                                ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
                            }
                        }
    
                        if (neigh) {
                            double dist = pick_distance(ndist, neigh);
    
                            // add to narrow band
                            treedist t;
                            t.dist  = dist;
                            t.coord = {i, j};
                            t.ts    = 0;
                            t.known = ndist.size();
                            V[i][j] = {fmmdist::close, dist, queue.push(t)};
                        }
                    }
                }
            }
        }
    
        struct Coef {
            double a, b, c;
        };
    
        double computeDist(int row, int column) {
            // Determine the x-coefficients
            auto closest = [](auto*... candidates) {
                double dist = INFINITY;
    
                for (auto p : {candidates...})
                    if (p && p->state == fmmdist::known)
                        dist = std::min(dist, p->dist);
    
                return Coef{1.0, -2.0 * dist, dist * dist};
            };
    
            Coef x = closest(                                   //
                row + 1 < rows ? &V[row + 1][column] : nullptr, //
                row > 0        ? &V[row - 1][column] : nullptr);
            Coef y = closest(                                         //
                column + 1 < columns ? &V[row][column + 1] : nullptr, //
                column > 0           ? &V[row][column - 1] : nullptr);
    
            // return the largest root of the quadratic
            double a = x.a + y.a;
            double b = x.b + y.b;
            double c = x.c + y.c - powf(ddx, 2);
    
            double result = quadratic(a, b, c);
            assert(!std::isnan(result));
            return result;
        }
    
        // Fast marching loop including simultaneous velocity extension
        void loopFMVel() {
            Timestamp tsclock = 1;
    
            while (!queue.empty()) {
                auto [row, column]   = queue.pop();
                V[row][column].state = fmmdist::known;
    
                for (unsigned k = 0; k < dx.size(); k++) {
                    auto const r = row + dx[k];
                    auto const c = column + dy[k];
                    if (r >= rows || r < 0 || c >= columns || c < 0)
                        continue;
    
                    auto& cell = V[r][c]; // TODO better name
                    if (cell.state == fmmdist::known)
                        continue;
    
                    auto dist = computeDist(r, c);
                    if (cell.state == fmmdist::close) {
                        if (auto& old = *cell.handle; dist < old.dist) {
                            cell.dist = dist;
                            queue.increase(cell.handle, dist);
                        }
                    } else if (cell.state == fmmdist::far) {
                        treedist temp;
                        temp.coord = {r, c};
                        temp.dist  = dist;
                        temp.known = 1;
                        temp.ts    = tsclock;
    
                        cell.state  = fmmdist::close;
                        cell.dist   = dist;
                        cell.handle = queue.push(temp);
                    }
                }
                tsclock += 1;
            }
        }
    };
    
    #include <chrono>
    #include <iomanip>
    #include <iostream>
    namespace {
        static long elapsed() {
            auto now = std::chrono::high_resolution_clock::now;
            using namespace std::chrono_literals;
            static auto start = now();
            auto        t     = now();
    
            return (t - std::exchange(start, t)) / 1ms;
        }
    
        void trace(auto const&... args) {
            ((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
        }
    
        template <typename Queue> void run() {
            trace(__PRETTY_FUNCTION__);
    
            auto a = std::make_unique<Algorithm<Queue, 5601>>();
            trace("Constructed");
            a->initPhi();
            a->initState();
            trace("Finished state init ");
            a->initialization(true);
            trace("Finished init of FMM ");
            a->loopFMVel();
            trace("Done");
        }
    } // namespace
    
    int main(int argc, char**) {
        std::ios::sync_with_stdio(false);
        trace("Start");
        if (argc == 1) run<DistanceTree>();
        if (argc >= 2) run<DistanceHeap>();
        if (argc == 3) run<DistanceTree>();
        trace("Exit");
    }
    

    Printing (online):

    g++ -std=c++20 -O3 -DNDEBUG -ffast-math -Wall -pedantic -pthread main.cpp && ./a.out heap tree
    
         0ms Start
         0ms void {anonymous}::run() [with Queue = DistanceHeap]
      1529ms Constructed
       263ms Finished state init 
       228ms Finished init of FMM 
     10523ms Done
       267ms void {anonymous}::run() [with Queue = DistanceTree]
      1204ms Constructed
       282ms Finished state init 
       225ms Finished init of FMM 
      8305ms Done
       262ms Exit
    

    Local demo:

    enter image description here