I have worked on implementing the fast marching method. It's a computational method to solve a special type of differential equation. In particular, this code solves the equation $$|\nabla phi| = 1$$ for $$\phi$$ given a boundary condition where $$\phi = 0$$. Anyways, to achieve a O(n lg n) running time requires the use of a priority queue supporting get_min(), extract_min(), decrease_key() and insert() (or push()). Moreover, the algorithm in question will use n insert() operations, n extract_min() operations and at worst 4n decrease_key() operations. Now, it would seem to me that heap such as the fibonacci_heap in the boost library would outperform an std set by a lot which supports the same operations (decrease key is implemented by erasing the element and readding it). However, this is not the case and I was wondering why?
(I would like to note that it is not possible to use the std priority_queue, as it does not support decrease_key())
Here is the code using a d-ary-heap:
#include <vector>
#include <limits>
#include <set>
#include <tuple>
#include <iostream>
#include <cmath>
#include <cassert>
#include <boost/heap/d_ary_heap.hpp>
using namespace boost::heap;
// Define epsilon value
#define EPS 0.0000000001
using namespace boost::heap;
struct treedist {
double d;
int row;
int col;
int ts;
int known;
bool operator<(const treedist& rhs) const
{
if (d == rhs.d) {
if (known == rhs.known) {
return ts > rhs.ts;
}
return known < rhs.known;
}
return d > rhs.d;
}
};
struct fmmdist {
int state;
double d = -1;
typename d_ary_heap<treedist,mutable_<true>,arity<2> >::handle_type it;
};
// Matrix representing state of point in fast marching method
std::vector<std::vector <fmmdist> > V;
// Binary tree used to efficiently store the distances
d_ary_heap<treedist,mutable_<true>,arity<2> > distances;
// phi array
std::vector<std::vector<double> > phi;
// Size of grid
int rows; int columns;
// Spatial step
double ddx;
const int dx[4] = {0,1,0,-1};
const int dy[4] = {1,0,-1,0};
// initialize phi array for testing purposes
void initPhi() {
ddx = 0.001;
rows = 16001;
columns = 16001;
for (int i = 0; i < rows; i++ ) {
std::vector<double> temp;
phi.push_back(temp);
for (int j = 0; j < columns; j++) {
phi[i].push_back(sqrt(powf(-8 + ddx*i,2) + powf(-8 + ddx*j,2)) - 4);
}
}
}
// Initialize the state array
void initState() {
// 0 means far, 1 means close, 2 means known
for (int i = 0; i < rows; i++) {
std::vector<fmmdist> temp;
V.push_back(temp);
for (int j = 0; j < columns; j++) {
struct fmmdist f;
f.state = 0;
V[i].push_back(f);
}
}
}
// return largest root of quadratic with coef. a, b, c
// This assumes delta >= 0 (this holds for FMM method)
double quadratic(double a, double b, double c) {
double delta = b * b - 4 * a * c;
if ( delta > 0 ) {
double r1 = (-b + std::sqrtf(delta))/(2*a);
double r2 = (-b - std::sqrtf(delta))/(2*a);
if (r1 > r2) {
return r1;
}
return r2;
}
return -b/(2*a);
}
// Initialization of the fast marching method
void initialization(bool inside) {
// 0 means far, 1 means close, 2 means known
// inside means we tag the points within the interface as known
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (inside) {
if (phi[i][j] < EPS) { // bound used to be used here...
V[i][j].state = 2;
V[i][j].d = 0;
}
} else {
if (phi[i][j] > -EPS) {
V[i][j].state = 2;
V[i][j].d = 0;
}
}
}
}
// find initial close points
// see paper by sethian
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (V[i][j].state != 2) {
int neigh = 0; // mod 16 this represents a specific configuration
double dist = 0;
int known = 0;
std::vector<double> ndist;
for (int k = 0; k < 4; k++) {
if (i + dx[k] >= rows || i + dx[k] < 0 || j + dy[k] >= columns || j + dy[k] < 0) continue;
if (V[i + dx[k]][j + dy[k]].state == 2) {
neigh += pow(2,k);
known++;
ndist.push_back(abs(phi[i][j]) * ddx/(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])) );
}
}
if (ndist.size() == 1) {
// case a
dist = ndist[0];
} else if (neigh == 3 || neigh == 12 || neigh == 6 || neigh == 9) {
// case b
dist = quadratic(2,0,-(ndist[0] * ndist[0] * ndist[1] * ndist[1])/(ndist[0] * ndist[0] + ndist[1] * ndist[1]));
} else if (neigh == 5 || neigh == 10) {
// case d
dist = fmin(ndist[0],ndist[1]);
} else if (neigh == 13) {
// case c, both vertical
double v = fmin(ndist[0],ndist[1]);
dist = quadratic(2,0,-(ndist[2] * ndist[2] * v * v)/(ndist[2] * ndist[2] + v*v));
} else if (neigh == 7) {
// case c, both vertical
double v = fmin(ndist[0],ndist[2]);
dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
} else if (neigh == 11) {
// case c, both horizontal
double v = fmin(ndist[1],ndist[2]);
dist = quadratic(2,0,-(ndist[0] * ndist[0] * v * v)/(ndist[0] * ndist[0] + v*v));
} else if (neigh == 14) {
// case c, both horizontal
double v = fmin(ndist[0],ndist[2]);
dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
} else if (neigh == 15) {
// case e
double v = fmin(ndist[0],ndist[2]);
double h = fmin(ndist[1],ndist[3]);
dist = quadratic(2,0,-(h * h * v * v)/(h * h + v*v));
}
if (neigh > 0) {
// add to narrow band
struct treedist t;
t.d = dist; t.row = i; t.col = j; t.ts = 0; t.known = known;
V[i][j].state = 1;
V[i][j].d = dist;
V[i][j].it = distances.push(t);
}
}
}
}
}
// Determine x coefficients of backward/forward difference
std::tuple<double, double, double> rowCoef(int row, int column, bool backward) {
int row1;
if (backward) {
row1 = row - 1;
} else {
row1 = row + 1;
}
return std::make_tuple(1.0, -2.0 * V[row1][column].d, V[row1][column].d * V[row1][column].d );
}
// Determine y coefficients of backward/forward difference
std::tuple<double, double, double> columnCoef(int row, int column, bool backward) {
int col1;
if (backward) {
col1 = column - 1;
} else {
col1 = column + 1;
}
return std::make_tuple(1.0, -2.0 * V[row][col1].d, V[row][col1].d * V[row][col1].d );
}
double computeDist(int row, int column) {
// Determine the x-coefficients
std::tuple<double, double, double> coefx = std::make_tuple(0,0,0);
if (row + 1 < rows && row - 1 >= 0 ) {
if(V[row + 1][column].state == 2 && V[row - 1][column].state == 2) coefx = rowCoef(row, column, V[row - 1][column].d < V[row + 1][column].d);
else if (V[row + 1][column].state == 2 ) coefx = rowCoef(row, column, false);
else if (V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
} else if ( row + 1 < rows) {
if(V[row + 1][column].state == 2) coefx = rowCoef(row, column, false);
} else if ( row - 1 >= 0) {
if(V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
}
// Determine the y-coefficients
std::tuple<double, double, double> coefy = std::make_tuple(0,0,0);
if (column + 1 < columns && column - 1 >= 0) {
if (V[row][column + 1].state == 2 && V[row][column - 1].state == 2) coefy = columnCoef(row, column, V[row][column - 1].d < V[row][column + 1].d);
else if (V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
else if (V[row][column - 1].state == 2 ) coefy = columnCoef(row, column, true);
} else if ( column + 1 < columns) {
if(V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
} else if ( column - 1 >= 0){
if(V[row][column - 1].state == 2) coefy = columnCoef(row, column, true);
}
// return the largest root of the quadratic
double a = std::get<0>(coefx) + std::get<0>(coefy);
double b = std::get<1>(coefx) + std::get<1>(coefy);
double c = std::get<2>(coefx) + std::get<2>(coefy) - powf(ddx,2);
double result = quadratic(a,b,c);
assert(!isnan(result));
return result;
}
// Fast marching loop including simultaneous velocity extension
void loopFMVel() {
int count = 1;
while(distances.size() > 0) {
// extract closest
struct treedist temp = distances.top();
int row = temp.row; int column = temp.col;
distances.pop();
V[row][column].state = 2;
for (int k = 0; k < 4; k++) {
if (row + dx[k] >= rows || row + dx[k] < 0 || column + dy[k] >= columns || column + dy[k] < 0) continue;
if (V[row + dx[k]][column + dy[k]].state == 2) continue;
double d;
if (V[row + dx[k]][column + dy[k]].state == 1) {
d = computeDist(row + dx[k], column + dy[k]);
if (d < (*V[row + dx[k]][column + dy[k]].it).d) {
(*V[row + dx[k]][column + dy[k]].it).d = d;
(*V[row + dx[k]][column + dy[k]].it).known = (*V[row + dx[k]][column + dy[k]].it).known + 1;
V[row + dx[k]][column + dy[k]].d = d;
distances.increase(V[row + dx[k]][column + dy[k]].it);
}
} else if (V[row + dx[k]][column + dy[k]].state == 0) {
d = computeDist(row + dx[k], column + dy[k]);
struct treedist t;
t.d = d; t.row = row + dx[k]; t.col = column + dy[k]; t.ts = count; t.known = 1;
V[row + dx[k]][column + dy[k]].state = 1;
V[row + dx[k]][column + dy[k]].d = d;
V[row + dx[k]][column + dy[k]].it = distances.push(t);
}
}
count++;
}
}
int main() {
distances.reserve(pow(2056,2));
initPhi();
initState();
std::cout << "Finished state init " << std::endl;
initialization(true);
std::cout << "Finished init of FMM " << std::endl;
loopFMVel();
return 0;
}
Here is code using a multiset:
#include <vector>
#include <limits>
#include <set>
#include <tuple>
#include <iostream>
#include <cmath>
#include <cassert>
// Define epsilon value
#define EPS 0.0000000001
struct treedist {
double d;
int row;
int col;
int ts;
int known;
bool operator<(const treedist& rhs) const
{
if (d == rhs.d) {
if (known == rhs.known) {
return ts < rhs.ts;
}
return known > rhs.known;
}
return d < rhs.d;
}
};
struct fmmdist {
int state;
double d = -1;
std::multiset<treedist>::iterator it;
};
// Matrix representing state of point in fast marching method
std::vector<std::vector <fmmdist> > V;
// Binary tree used to efficiently store the distances
std::multiset<treedist> distances;
// phi array
std::vector<std::vector<double> > phi;
// Size of grid
int rows; int columns;
// Spatial step
double ddx;
const int dx[4] = {0,1,0,-1};
const int dy[4] = {1,0,-1,0};
// initialize phi array for testing purposes
void initPhi() {
ddx = 0.001;
rows = 16001;
columns = 16001;
for (int i = 0; i < rows; i++ ) {
std::vector<double> temp;
phi.push_back(temp);
for (int j = 0; j < columns; j++) {
phi[i].push_back(sqrt(powf(-8 + ddx*i,2) + powf(-8 + ddx*j,2)) - 4);
}
}
}
// Initialize the state array
void initState() {
// 0 means far, 1 means close, 2 means known
for (int i = 0; i < rows; i++) {
std::vector<fmmdist> temp;
V.push_back(temp);
for (int j = 0; j < columns; j++) {
struct fmmdist f;
f.state = 0;
V[i].push_back(f);
}
}
}
// return largest root of quadratic with coef. a, b, c
double quadratic(double a, double b, double c) {
double delta = b * b - 4 * a * c;
if ( delta > 0 ) {
double r1 = (-b + std::sqrtf(delta))/(2*a);
double r2 = (-b - std::sqrtf(delta))/(2*a);
if (r1 > r2) {
return r1;
}
return r2;
}
return -b/(2*a);
}
// Initialization of the fast marching method
void initialization(bool inside) {
// 0 means far, 1 means close, 2 means known
// inside means we tag the points within the interface as known
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (inside) {
if (phi[i][j] < EPS) { // bound used to be used here...
V[i][j].state = 2;
V[i][j].d = 0;
}
} else {
if (phi[i][j] > -EPS) {
V[i][j].state = 2;
V[i][j].d = 0;
}
}
}
}
// find initial close points
// see paper by sethian
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (V[i][j].state != 2) {
int neigh = 0; // mod 16 this represents a specific configuration
double dist = 0;
int known = 0;
std::vector<double> ndist;
for (int k = 0; k < 4; k++) {
if (i + dx[k] >= rows || i + dx[k] < 0 || j + dy[k] >= columns || j + dy[k] < 0) continue;
if (V[i + dx[k]][j + dy[k]].state == 2) {
neigh += pow(2,k);
known++;
ndist.push_back(abs(phi[i][j]) * ddx/(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])) );
}
}
if (ndist.size() == 1) {
// case a
dist = ndist[0];
} else if (neigh == 3 || neigh == 12 || neigh == 6 || neigh == 9) {
// case b
dist = quadratic(2,0,-(ndist[0] * ndist[0] * ndist[1] * ndist[1])/(ndist[0] * ndist[0] + ndist[1] * ndist[1]));
} else if (neigh == 5 || neigh == 10) {
// case d
dist = fmin(ndist[0],ndist[1]);
} else if (neigh == 13) {
// case c, both vertical
double v = fmin(ndist[0],ndist[1]);
dist = quadratic(2,0,-(ndist[2] * ndist[2] * v * v)/(ndist[2] * ndist[2] + v*v));
} else if (neigh == 7) {
// case c, both vertical
double v = fmin(ndist[0],ndist[2]);
dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
} else if (neigh == 11) {
// case c, both horizontal
double v = fmin(ndist[1],ndist[2]);
dist = quadratic(2,0,-(ndist[0] * ndist[0] * v * v)/(ndist[0] * ndist[0] + v*v));
} else if (neigh == 14) {
// case c, both horizontal
double v = fmin(ndist[0],ndist[2]);
dist = quadratic(2,0,-(ndist[1] * ndist[1] * v * v)/(ndist[1] * ndist[1] + v*v));
} else if (neigh == 15) {
// case e
double v = fmin(ndist[0],ndist[2]);
double h = fmin(ndist[1],ndist[3]);
dist = quadratic(2,0,-(h * h * v * v)/(h * h + v*v));
}
if (neigh > 0) {
// add to narrow band
struct treedist t;
t.d = dist; t.row = i; t.col = j; t.ts = 0; t.known = known;
V[i][j].state = 1;
V[i][j].d = dist;
V[i][j].it = distances.insert(t);
}
}
}
}
}
// Determine x coefficients of backward/forward difference
std::tuple<double, double, double> rowCoef(int row, int column, bool backward) {
int row1;
if (backward) {
row1 = row - 1;
} else {
row1 = row + 1;
}
return std::make_tuple(1.0, -2.0 * V[row1][column].d, V[row1][column].d * V[row1][column].d );
}
// Determine y coefficients of backward/forward difference
std::tuple<double, double, double> columnCoef(int row, int column, bool backward) {
int col1;
if (backward) {
col1 = column - 1;
} else {
col1 = column + 1;
}
return std::make_tuple(1.0, -2.0 * V[row][col1].d, V[row][col1].d * V[row][col1].d );
}
double computeDist(int row, int column) {
// Determine the x-coefficients
std::tuple<double, double, double> coefx = std::make_tuple(0,0,0);
if (row + 1 < rows && row - 1 >= 0 ) {
if(V[row + 1][column].state == 2 && V[row - 1][column].state == 2) coefx = rowCoef(row, column, V[row - 1][column].d < V[row + 1][column].d);
else if (V[row + 1][column].state == 2 ) coefx = rowCoef(row, column, false);
else if (V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
} else if ( row + 1 < rows) {
if(V[row + 1][column].state == 2) coefx = rowCoef(row, column, false);
} else if ( row - 1 >= 0) {
if(V[row - 1][column].state == 2) coefx = rowCoef(row, column, true);
}
// Determine the y-coefficients
std::tuple<double, double, double> coefy = std::make_tuple(0,0,0);
if (column + 1 < columns && column - 1 >= 0) {
if (V[row][column + 1].state == 2 && V[row][column - 1].state == 2) coefy = columnCoef(row, column, V[row][column - 1].d < V[row][column + 1].d);
else if (V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
else if (V[row][column - 1].state == 2 ) coefy = columnCoef(row, column, true);
} else if ( column + 1 < columns) {
if(V[row][column + 1].state == 2) coefy = columnCoef(row, column, false);
} else if ( column - 1 >= 0){
if(V[row][column - 1].state == 2) coefy = columnCoef(row, column, true);
}
// return the largest root of the quadratic
double a = std::get<0>(coefx) + std::get<0>(coefy);
double b = std::get<1>(coefx) + std::get<1>(coefy);
double c = std::get<2>(coefx) + std::get<2>(coefy) - powf(ddx,2);
double result = quadratic(a,b,c);
return result;
}
// Fast marching loop i
void loopFMVel() {
int count = 1;
while(distances.size() > 0) {
// extract closest
std::multiset<treedist>::iterator it = distances.begin();
int row = (*it).row; int column = (*it).col;
distances.erase(it);
V[row][column].state = 2;
for (int k = 0; k < 4; k++) {
if (row + dx[k] >= rows || row + dx[k] < 0 || column + dy[k] >= columns || column + dy[k] < 0) continue;
if (V[row + dx[k]][column + dy[k]].state == 2) continue;
double d;
if (V[row + dx[k]][column + dy[k]].state == 1) {
d = computeDist(row + dx[k], column + dy[k]);
struct treedist t;
t.d = (*V[row + dx[k]][column + dy[k]].it).d; t.known = (*V[row + dx[k]][column + dy[k]].it).known;
t.known++;
t.row = row + dx[k]; t.col = column + dy[k]; t.ts = (*V[row + dx[k]][column + dy[k]].it).ts;
if (d < t.d) {
t.d = d;
V[row + dx[k]][column + dy[k]].d = d;
}
distances.erase(V[row + dx[k]][column + dy[k]].it);
V[row + dx[k]][column + dy[k]].it = distances.insert(t);
} else if (V[row + dx[k]][column + dy[k]].state == 0) {
d = computeDist(row + dx[k], column + dy[k]);
struct treedist t;
t.d = d; t.row = row + dx[k]; t.col = column + dy[k]; t.ts = count; t.known = 1;
V[row + dx[k]][column + dy[k]].state = 1;
V[row + dx[k]][column + dy[k]].d = d;
V[row + dx[k]][column + dy[k]].it = distances.insert(t);
}
}
count++;
}
std::cout << "Finished with size " << distances.size() << std::endl;
}
int main() {
initPhi();
initState();
std::cout << "Finished state init " << std::endl;
initialization(true);
std::cout << "Finished init of FMM " << std::endl;
loopFMVel();
return 0;
}
The following are some results from my testing:
N = 16001^2 (I ran these tests on an M1 max with flags -Ofast -fno-finite-math-only -march=armv8.5-a -mcpu=native -ffast-math) binary_heap (with reserved memory): 58.35s multiset: 63.33s fibonacci_heap: 73.43 16_ary_heap (with reserved memory): 65.40s pairing_heap: 116.5s
I profiled using time. If any more details are required, I will gladly add them.
I spent an inordinate amount of time refactoring the code so I could actually understand it.
While reading through I made some changes that
avoid repeated code or subexpressions (which are very error-prone especially in copy-pasted stuff)
tried to find "meaningful" names.
removed magic constants (if you can comment them, you can use an enum)
avoid dynamic allocation and incremental insertions. Unless you define USE_VECTOR
my version uses std::array
to allocate everything up-front:
#ifndef USE_VECTOR
template <typename T, size_t C> using Row = std::array<T, C>;
template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
#else
template <typename T, size_t R, size_t C> struct Matrix {
std::vector<std::vector<T>> impl_;
Matrix() : impl_(R, std::vector<T>(C, T{})) {}
auto& operator[](size_t n) const { return impl_[n]; }
auto& operator[](size_t n) { return impl_[n]; }
};
#endif
Which is used for both the phi
and V
:
// phi array
Matrix<double, rows, columns> phi;
// Matrix representing state of point in fast marching method
Matrix<fmmdist, rows, columns> V;
avoid C-isms like sqrtf
, prefer std::min
over manual compare and swap
I rewrote the branches in initialization
to always use the same path for the quadratic
calculation:
double dist = pick_distance(ndist, neigh);
Which is implemented (also using a static_vector<double, 4>
for the max 4 neighboring distances):
using NDist = boost::container::static_vector<double, 4>;
static double pick_distance(NDist const& ndist, unsigned configuration) {
if (ndist.size() == 1)
return ndist[0]; // case a
auto q_hv = [](double h, double v) {
return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
};
switch (configuration) {
case 3: case 12:
case 6: case 9: return q_hv(ndist[0], ndist[1]); // case b
case 5: case 10: return std::min(ndist[0], ndist[1]); // case d
case 13: return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
case 7: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
case 11: return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
case 14: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
case 15: return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
default: return 0;
}
};
adding the bits to the neigh
bitmask (which is a clever trick, by the way) can be done safer and more effectively:
neigh |= (1 << k); // instead of neigh += pow(2,k);
the various variables known
and ndist
all have pretty redundant state, adhering to the following invariants:
assert(known >= 0);
assert(std::popcount(neigh) == known);
assert(known == static_cast<int>(ndist.size()));
assert((neigh != 0) == (known != 0));
assert((neigh == 0) == ndist.empty());
I'd suggest dropping some of the unused values. With these, and extracting the repeated sub-expressions, the initialization
function becomes much more readable:
// Initialization of the fast marching method
void initialization(bool inside) {
// inside means we tag the points within the interface as known
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
V[i][j].state = fmmdist::known;
V[i][j].dist = 0;
}
// find initial close points
// see paper by James Sethian
NDist ndist;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (V[i][j].state != fmmdist::known) {
unsigned neigh = 0; // mod 16 this represents a specific configuration
ndist.clear();
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = i + dx[k];
auto const c = j + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
continue;
if (V[r][c].state == fmmdist::known) {
neigh |= (1 << k);
using std::abs;
ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
}
}
if (neigh) {
double dist = pick_distance(ndist, neigh);
// add to narrow band
treedist t;
t.dist = dist;
t.coord = {i, j};
t.ts = 0;
t.known = ndist.size();
V[i][j] = {fmmdist::close, dist, queue.push(t)};
}
}
}
}
}
Similar treatment of the loopFMVel()
function has a similar effect:
// Fast marching loop including simultaneous velocity extension
void loopFMVel() {
Timestamp tsclock = 1;
while (!queue.empty()) {
auto [row, column] = queue.pop();
V[row][column].state = fmmdist::known;
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = row + dx[k];
auto const c = column + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
continue;
auto& cell = V[r][c]; // TODO better name
if (cell.state == fmmdist::known)
continue;
auto dist = computeDist(r, c);
if (cell.state == fmmdist::close) {
if (auto& old = *cell.handle; dist < old.dist) {
cell.dist = dist;
queue.increase(cell.handle, dist);
}
} else if (cell.state == fmmdist::far) {
treedist temp;
temp.coord = {r, c};
temp.dist = dist;
temp.known = 1;
temp.ts = tsclock;
cell.state = fmmdist::close;
cell.dist = dist;
cell.handle = queue.push(temp);
}
}
tsclock += 1;
}
}
Along the way I spotted some potential issues:
Accidentally truncating floating point numbers to integers
Did you mean to use int ::abs(int)
in line 129? Fix it using std::abs
to get double std::abs(double)
:
using std::abs;
ndist.push_back(
abs(phi[i][j]) * ddx /
(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])));
Inconsistent computeDist
. After all my review the function now reads as:
struct Coef {
double a, b, c;
static Coef make(double d) { return {1.0, -2.0 * d, d * d}; };
};
double computeDist(int row, int column) {
// Determine the x-coefficients
auto closest = [](fmmdist const* next, fmmdist const* prev) {
if (next && next->state != fmmdist::known) next = nullptr;
if (prev && prev->state != fmmdist::known) prev = nullptr;
if (!(next || prev))
return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
double d = std::numeric_limits<double>::infinity();
if (next) d = std::min(d, next->dist);
if (prev) d = std::min(d, prev->dist);
return Coef::make(d);
};
Coef x = closest( //
row + 1 < rows ? &V[row + 1][column] : nullptr, //
row > 0 ? &V[row - 1][column] : nullptr);
Coef y = closest( //
column + 1 < columns ? &V[row][column + 1] : nullptr, //
column > 0 ? &V[row][column - 1] : nullptr);
// return the largest root of the quadratic
double a = x.a + y.a;
double b = x.b + y.b;
double c = x.c + y.c - powf(ddx, 2);
double result = quadratic(a, b, c);
assert(!std::isnan(result));
return result;
}
In this form the distinction between rowCoef
and columnCoef
went away. And a potential problem is highlighted:
if (!(next || prev))
return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
If no neighbouring row/column is accessible and in the known
, we're defaulting to suddenly different coefficents (0, 0, 0)
, where I'd probably have expected (2, -Inf, Inf)
for consistency. If you agree, we can remove the special-case making the code even more elegant:
auto closest = [](auto*... candidates) {
double dist = INFINITY;
for (auto p : {candidates...})
if (p && p->state == fmmdist::known)
dist = std::min(dist, p->dist);
return Coef{1.0, -2.0 * dist, dist * dist};
};
Comparing your versions, I note that the multiset variant behaves differently: it always increases known
regardless of whether the distance is decreased.
As you've seen above, I assumed that you only want to update like in your heap implementation, to make it a fair comparison.
I refactored the code so I could switch the queue implementation with the same FMM implementation:
struct QueueConcept {
using Handle = /*stable handle type*/;
size_t size() const;
bool empty() const;
Coord pop(); // extract closest
Handle push(treedist v);
void increase(Handle& h, double dist);
};
Which is trivially implemented for both multiset
and d_ary_heap
.
struct DistanceTree {
using Storage = std::multiset<treedist, treedist::Less>;
using Handle = typename Storage::const_iterator; // must be stable
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
Handle handle = _storage.begin();
Coord coord = handle->coord;
_storage.erase(handle);
return coord;
}
Handle push(treedist v) { return _storage.insert(std::move(v)); }
void increase(Handle& h, double dist) {
treedist v = *h;
_storage.erase(h);
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
h = push(std::move(v));
}
private:
Storage _storage;
};
struct DistanceHeap {
using Storage = bh::d_ary_heap<treedist, //
bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
using Handle = Storage::handle_type;
DistanceHeap() { _storage.reserve(2056 * 2056); }
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
auto& temp = _storage.top();
Coord coord = temp.coord;
_storage.pop();
return coord;
}
Handle push(treedist v) { return _storage.push(std::move(v)); }
void increase(Handle h, double dist) {
auto& v = *h;
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
_storage.increase(h);
}
private:
Storage _storage;
};
Now I templated the Algorithm
code like so:
template <typename Queue, int rows = 16001, int columns = rows> //
struct Algorithm {
Queue queue; // prio queue used to efficiently store the distances
// .. the rest
};
And added a test-bed:
#include <chrono>
#include <iomanip>
namespace {
static long elapsed() {
auto now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
static auto start = now();
auto t = now();
return (t - std::exchange(start, t)) / 1ms;
}
void trace(auto const&... args) {
((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
}
template <typename Queue> void run() {
trace(__PRETTY_FUNCTION__);
auto a = std::make_unique<Algorithm<Queue, 5601>>();
trace("Constructed");
a->initPhi();
a->initState();
trace("Finished state init ");
a->initialization(true);
trace("Finished init of FMM ");
a->loopFMVel();
trace("Done");
}
} // namespace
int main(int argc, char**) {
std::ios::sync_with_stdio(false);
trace("Start");
if (argc == 1) run<DistanceTree>();
if (argc >= 2) run<DistanceHeap>();
if (argc == 3) run<DistanceTree>();
trace("Exit");
}
#include <array>
#include <cassert>
#include <cmath>
#include <limits>
#include <memory>
#include <set>
#include <tuple>
#include <utility>
#include <vector>
#include <boost/container/static_vector.hpp>
#include <boost/heap/d_ary_heap.hpp>
namespace bh = boost::heap;
// Define epsilon value
static inline constexpr double EPS = 1e-10;
#ifndef USE_VECTOR
template <typename T, size_t C> using Row = std::array<T, C>;
template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
#else
template <typename T, size_t R, size_t C> struct Matrix {
std::vector<std::vector<T>> impl_;
Matrix() : impl_(R, std::vector<T>(C, T{})) {}
auto& operator[](size_t n) const { return impl_[n]; }
auto& operator[](size_t n) { return impl_[n]; }
};
#endif
using Timestamp = unsigned long;
struct Coord { int row, col; };
struct treedist {
double dist;
int known;
Timestamp ts;
Coord coord;
private:
constexpr auto key() const { // natural order by nearest, greatest known and earliest ts
return std::tuple(dist, -known, ts);
}
public:
struct Less {
constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() < b.key(); }
};
struct Greater {
constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() > b.key(); }
};
};
struct DistanceTree {
using Storage = std::multiset<treedist, treedist::Less>;
using Handle = typename Storage::const_iterator; // must be stable
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
Handle handle = _storage.begin();
Coord coord = handle->coord;
_storage.erase(handle);
return coord;
}
Handle push(treedist v) { return _storage.insert(std::move(v)); }
void increase(Handle& h, double dist) {
treedist v = *h;
_storage.erase(h);
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
h = push(std::move(v));
}
private:
Storage _storage;
};
struct DistanceHeap {
using Storage = bh::d_ary_heap<treedist, //
bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
using Handle = Storage::handle_type;
DistanceHeap() { _storage.reserve(2056 * 2056); }
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
auto& temp = _storage.top();
Coord coord = temp.coord;
_storage.pop();
return coord;
}
Handle push(treedist v) { return _storage.push(std::move(v)); }
void increase(Handle h, double dist) {
auto& v = *h;
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
_storage.increase(h);
}
private:
Storage _storage;
};
template <typename Queue, int rows = 16001, int columns = rows> //
struct Algorithm {
Queue queue; // prio queue used to efficiently store the distances
struct fmmdist {
enum State { far = 0, close = 1, known = 2 };
State state{far};
double dist{-1};
typename Queue::Handle handle{};
};
// phi array
Matrix<double, rows, columns> phi;
// Matrix representing state of point in fast marching method
Matrix<fmmdist, rows, columns> V;
// Spatial step
static double constexpr ddx = 1e-3;
static constexpr std::array<int, 4> dx{0, 1, 0, -1};
static constexpr std::array<int, 4> dy{1, 0, -1, 0};
// initialize phi array for testing purposes
inline void initPhi() {
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
phi[i][j] = sqrt(pow(-8 + ddx * i, 2) + pow(-8 + ddx * j, 2)) - 4;
}
// Initialize the state array
void initState() {
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
V[i][j] = {fmmdist::far, -1, {}};
}
// return largest root of quadratic with coef. a, b, c
// This assumes delta >= 0 (this holds for FMM method)
static double quadratic(double a, double b, double c) {
double delta = b * b - 4 * a * c;
if (delta > 0) {
double r1 = (-b + std::sqrt(delta)) / (2 * a);
double r2 = (-b - std::sqrt(delta)) / (2 * a);
return std::max(r1, r2);
}
return -b / (2 * a);
}
using NDist = boost::container::static_vector<double, 4>;
static double pick_distance(NDist const& ndist, unsigned configuration) {
if (ndist.size() == 1)
return ndist[0]; // case a
auto q_hv = [](double h, double v) {
return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
};
switch (configuration) {
case 3: case 12:
case 6: case 9: return q_hv(ndist[0], ndist[1]); // case b
case 5: case 10: return std::min(ndist[0], ndist[1]); // case d
case 13: return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
case 7: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
case 11: return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
case 14: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
case 15: return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
default: return 0;
}
};
// Initialization of the fast marching method
void initialization(bool inside) {
// inside means we tag the points within the interface as known
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
V[i][j].state = fmmdist::known;
V[i][j].dist = 0;
}
// find initial close points
// see paper by James Sethian
NDist ndist;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (V[i][j].state != fmmdist::known) {
unsigned neigh = 0; // mod 16 this represents a specific configuration
ndist.clear();
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = i + dx[k];
auto const c = j + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
continue;
if (V[r][c].state == fmmdist::known) {
neigh |= (1 << k);
using std::abs;
ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
}
}
if (neigh) {
double dist = pick_distance(ndist, neigh);
// add to narrow band
treedist t;
t.dist = dist;
t.coord = {i, j};
t.ts = 0;
t.known = ndist.size();
V[i][j] = {fmmdist::close, dist, queue.push(t)};
}
}
}
}
}
struct Coef {
double a, b, c;
};
double computeDist(int row, int column) {
// Determine the x-coefficients
auto closest = [](auto*... candidates) {
double dist = INFINITY;
for (auto p : {candidates...})
if (p && p->state == fmmdist::known)
dist = std::min(dist, p->dist);
return Coef{1.0, -2.0 * dist, dist * dist};
};
Coef x = closest( //
row + 1 < rows ? &V[row + 1][column] : nullptr, //
row > 0 ? &V[row - 1][column] : nullptr);
Coef y = closest( //
column + 1 < columns ? &V[row][column + 1] : nullptr, //
column > 0 ? &V[row][column - 1] : nullptr);
// return the largest root of the quadratic
double a = x.a + y.a;
double b = x.b + y.b;
double c = x.c + y.c - powf(ddx, 2);
double result = quadratic(a, b, c);
assert(!std::isnan(result));
return result;
}
// Fast marching loop including simultaneous velocity extension
void loopFMVel() {
Timestamp tsclock = 1;
while (!queue.empty()) {
auto [row, column] = queue.pop();
V[row][column].state = fmmdist::known;
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = row + dx[k];
auto const c = column + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
continue;
auto& cell = V[r][c]; // TODO better name
if (cell.state == fmmdist::known)
continue;
auto dist = computeDist(r, c);
if (cell.state == fmmdist::close) {
if (auto& old = *cell.handle; dist < old.dist) {
cell.dist = dist;
queue.increase(cell.handle, dist);
}
} else if (cell.state == fmmdist::far) {
treedist temp;
temp.coord = {r, c};
temp.dist = dist;
temp.known = 1;
temp.ts = tsclock;
cell.state = fmmdist::close;
cell.dist = dist;
cell.handle = queue.push(temp);
}
}
tsclock += 1;
}
}
};
#include <chrono>
#include <iomanip>
#include <iostream>
namespace {
static long elapsed() {
auto now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
static auto start = now();
auto t = now();
return (t - std::exchange(start, t)) / 1ms;
}
void trace(auto const&... args) {
((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
}
template <typename Queue> void run() {
trace(__PRETTY_FUNCTION__);
auto a = std::make_unique<Algorithm<Queue, 5601>>();
trace("Constructed");
a->initPhi();
a->initState();
trace("Finished state init ");
a->initialization(true);
trace("Finished init of FMM ");
a->loopFMVel();
trace("Done");
}
} // namespace
int main(int argc, char**) {
std::ios::sync_with_stdio(false);
trace("Start");
if (argc == 1) run<DistanceTree>();
if (argc >= 2) run<DistanceHeap>();
if (argc == 3) run<DistanceTree>();
trace("Exit");
}
Printing (online):
g++ -std=c++20 -O3 -DNDEBUG -ffast-math -Wall -pedantic -pthread main.cpp && ./a.out heap tree
0ms Start
0ms void {anonymous}::run() [with Queue = DistanceHeap]
1529ms Constructed
263ms Finished state init
228ms Finished init of FMM
10523ms Done
267ms void {anonymous}::run() [with Queue = DistanceTree]
1204ms Constructed
282ms Finished state init
225ms Finished init of FMM
8305ms Done
262ms Exit
Local demo: