I have worked on implementing the fast marching method. It's a computational method to solve a special type of differential equation. In particular, this code solves the equation $$|\nabla phi| = 1$$ for $$\phi$$ given a boundary condition where $$\phi = 0$$. Anyways, to achieve a O(n lg n) running time requires the use of a priority queue supporting get_min(), extract_min(), decrease_key() and insert() (or push()). Moreover, the algorithm in question will use n insert() operations, n extract_min() operations and at worst 4n decrease_key() operations. Now, it would seem to me that heap such as the fibonacci_heap in the boost library would outperform an std set by a lot which supports the same operations (decrease key is implemented by erasing the element and readding it). However, this is not the case and I was wondering why?
(I would like to note that it is not possible to use the std priority_queue, as it does not support decrease_key())
Here is the code using a d-ary-heap:
Here is code using a multiset:
The following are some results from my testing:
N = 16001^2 (I ran these tests on an M1 max with flags -Ofast -fno-finite-math-only -march=armv8.5-a -mcpu=native -ffast-math) binary_heap (with reserved memory): 58.35s multiset: 63.33s fibonacci_heap: 73.43 16_ary_heap (with reserved memory): 65.40s pairing_heap: 116.5s
I profiled using time. If any more details are required, I will gladly add them.
I spent an inordinate amount of time refactoring the code so I could actually understand it.
While reading through I made some changes that
avoid repeated code or subexpressions (which are very error-prone especially in copy-pasted stuff)
tried to find "meaningful" names.
removed magic constants (if you can comment them, you can use an enum)
avoid dynamic allocation and incremental insertions. Unless you define USE_VECTOR
my version uses std::array
to allocate everything up-front:
#ifndef USE_VECTOR
template <typename T, size_t C> using Row = std::array<T, C>;
template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
template <typename T, size_t R, size_t C> struct Matrix {
std::vector<std::vector<T>> impl_;
Matrix() : impl_(R, std::vector<T>(C, T{})) {}
auto& operator[](size_t n) const { return impl_[n]; }
auto& operator[](size_t n) { return impl_[n]; }
Which is used for both the phi
and V
// phi array
Matrix<double, rows, columns> phi;
// Matrix representing state of point in fast marching method
Matrix<fmmdist, rows, columns> V;
avoid C-isms like sqrtf
, prefer std::min
over manual compare and swap
I rewrote the branches in initialization
to always use the same path for the quadratic
double dist = pick_distance(ndist, neigh);
Which is implemented (also using a static_vector<double, 4>
for the max 4 neighboring distances):
using NDist = boost::container::static_vector<double, 4>;
static double pick_distance(NDist const& ndist, unsigned configuration) {
if (ndist.size() == 1)
return ndist[0]; // case a
auto q_hv = [](double h, double v) {
return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
switch (configuration) {
case 3: case 12:
case 6: case 9: return q_hv(ndist[0], ndist[1]); // case b
case 5: case 10: return std::min(ndist[0], ndist[1]); // case d
case 13: return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
case 7: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
case 11: return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
case 14: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
case 15: return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
default: return 0;
adding the bits to the neigh
bitmask (which is a clever trick, by the way) can be done safer and more effectively:
neigh |= (1 << k); // instead of neigh += pow(2,k);
the various variables known
and ndist
all have pretty redundant state, adhering to the following invariants:
assert(known >= 0);
assert(std::popcount(neigh) == known);
assert(known == static_cast<int>(ndist.size()));
assert((neigh != 0) == (known != 0));
assert((neigh == 0) == ndist.empty());
I'd suggest dropping some of the unused values. With these, and extracting the repeated sub-expressions, the initialization
function becomes much more readable:
// Initialization of the fast marching method
void initialization(bool inside) {
// inside means we tag the points within the interface as known
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
V[i][j].state = fmmdist::known;
V[i][j].dist = 0;
// find initial close points
// see paper by James Sethian
NDist ndist;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (V[i][j].state != fmmdist::known) {
unsigned neigh = 0; // mod 16 this represents a specific configuration
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = i + dx[k];
auto const c = j + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
if (V[r][c].state == fmmdist::known) {
neigh |= (1 << k);
using std::abs;
ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
if (neigh) {
double dist = pick_distance(ndist, neigh);
// add to narrow band
treedist t;
t.dist = dist;
t.coord = {i, j};
t.ts = 0;
t.known = ndist.size();
V[i][j] = {fmmdist::close, dist, queue.push(t)};
Similar treatment of the loopFMVel()
function has a similar effect:
// Fast marching loop including simultaneous velocity extension
void loopFMVel() {
Timestamp tsclock = 1;
while (!queue.empty()) {
auto [row, column] = queue.pop();
V[row][column].state = fmmdist::known;
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = row + dx[k];
auto const c = column + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
auto& cell = V[r][c]; // TODO better name
if (cell.state == fmmdist::known)
auto dist = computeDist(r, c);
if (cell.state == fmmdist::close) {
if (auto& old = *cell.handle; dist < old.dist) {
cell.dist = dist;
queue.increase(cell.handle, dist);
} else if (cell.state == fmmdist::far) {
treedist temp;
temp.coord = {r, c};
temp.dist = dist;
temp.known = 1;
temp.ts = tsclock;
cell.state = fmmdist::close;
cell.dist = dist;
cell.handle = queue.push(temp);
tsclock += 1;
Along the way I spotted some potential issues:
Accidentally truncating floating point numbers to integers
Did you mean to use int ::abs(int)
in line 129? Fix it using std::abs
to get double std::abs(double)
using std::abs;
abs(phi[i][j]) * ddx /
(abs(phi[i][j]) + abs(phi[i + dx[k]][j + dy[k]])));
Inconsistent computeDist
. After all my review the function now reads as:
struct Coef {
double a, b, c;
static Coef make(double d) { return {1.0, -2.0 * d, d * d}; };
double computeDist(int row, int column) {
// Determine the x-coefficients
auto closest = [](fmmdist const* next, fmmdist const* prev) {
if (next && next->state != fmmdist::known) next = nullptr;
if (prev && prev->state != fmmdist::known) prev = nullptr;
if (!(next || prev))
return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
double d = std::numeric_limits<double>::infinity();
if (next) d = std::min(d, next->dist);
if (prev) d = std::min(d, prev->dist);
return Coef::make(d);
Coef x = closest( //
row + 1 < rows ? &V[row + 1][column] : nullptr, //
row > 0 ? &V[row - 1][column] : nullptr);
Coef y = closest( //
column + 1 < columns ? &V[row][column + 1] : nullptr, //
column > 0 ? &V[row][column - 1] : nullptr);
// return the largest root of the quadratic
double a = x.a + y.a;
double b = x.b + y.b;
double c = x.c + y.c - powf(ddx, 2);
double result = quadratic(a, b, c);
return result;
In this form the distinction between rowCoef
and columnCoef
went away. And a potential problem is highlighted:
if (!(next || prev))
return Coef{0, 0, 0}; // TODO is this okay? might need to be Coef::make(INF)?
If no neighbouring row/column is accessible and in the known
, we're defaulting to suddenly different coefficents (0, 0, 0)
, where I'd probably have expected (2, -Inf, Inf)
for consistency. If you agree, we can remove the special-case making the code even more elegant:
auto closest = [](auto*... candidates) {
double dist = INFINITY;
for (auto p : {candidates...})
if (p && p->state == fmmdist::known)
dist = std::min(dist, p->dist);
return Coef{1.0, -2.0 * dist, dist * dist};
Comparing your versions, I note that the multiset variant behaves differently: it always increases known
regardless of whether the distance is decreased.
As you've seen above, I assumed that you only want to update like in your heap implementation, to make it a fair comparison.
I refactored the code so I could switch the queue implementation with the same FMM implementation:
struct QueueConcept {
using Handle = /*stable handle type*/;
size_t size() const;
bool empty() const;
Coord pop(); // extract closest
Handle push(treedist v);
void increase(Handle& h, double dist);
Which is trivially implemented for both multiset
and d_ary_heap
struct DistanceTree {
using Storage = std::multiset<treedist, treedist::Less>;
using Handle = typename Storage::const_iterator; // must be stable
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
Handle handle = _storage.begin();
Coord coord = handle->coord;
return coord;
Handle push(treedist v) { return _storage.insert(std::move(v)); }
void increase(Handle& h, double dist) {
treedist v = *h;
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
h = push(std::move(v));
Storage _storage;
struct DistanceHeap {
using Storage = bh::d_ary_heap<treedist, //
bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
using Handle = Storage::handle_type;
DistanceHeap() { _storage.reserve(2056 * 2056); }
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
auto& temp = _storage.top();
Coord coord = temp.coord;
return coord;
Handle push(treedist v) { return _storage.push(std::move(v)); }
void increase(Handle h, double dist) {
auto& v = *h;
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
Storage _storage;
Now I templated the Algorithm
code like so:
template <typename Queue, int rows = 16001, int columns = rows> //
struct Algorithm {
Queue queue; // prio queue used to efficiently store the distances
// .. the rest
And added a test-bed:
#include <chrono>
#include <iomanip>
namespace {
static long elapsed() {
auto now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
static auto start = now();
auto t = now();
return (t - std::exchange(start, t)) / 1ms;
void trace(auto const&... args) {
((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
template <typename Queue> void run() {
auto a = std::make_unique<Algorithm<Queue, 5601>>();
trace("Finished state init ");
trace("Finished init of FMM ");
} // namespace
int main(int argc, char**) {
if (argc == 1) run<DistanceTree>();
if (argc >= 2) run<DistanceHeap>();
if (argc == 3) run<DistanceTree>();
#include <array>
#include <cassert>
#include <cmath>
#include <limits>
#include <memory>
#include <set>
#include <tuple>
#include <utility>
#include <vector>
#include <boost/container/static_vector.hpp>
#include <boost/heap/d_ary_heap.hpp>
namespace bh = boost::heap;
// Define epsilon value
static inline constexpr double EPS = 1e-10;
#ifndef USE_VECTOR
template <typename T, size_t C> using Row = std::array<T, C>;
template <typename T, size_t R, size_t C> using Matrix = std::array<Row<T, C>, R>;
template <typename T, size_t R, size_t C> struct Matrix {
std::vector<std::vector<T>> impl_;
Matrix() : impl_(R, std::vector<T>(C, T{})) {}
auto& operator[](size_t n) const { return impl_[n]; }
auto& operator[](size_t n) { return impl_[n]; }
using Timestamp = unsigned long;
struct Coord { int row, col; };
struct treedist {
double dist;
int known;
Timestamp ts;
Coord coord;
constexpr auto key() const { // natural order by nearest, greatest known and earliest ts
return std::tuple(dist, -known, ts);
struct Less {
constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() < b.key(); }
struct Greater {
constexpr bool operator()(treedist const& a, treedist const& b) const { return a.key() > b.key(); }
struct DistanceTree {
using Storage = std::multiset<treedist, treedist::Less>;
using Handle = typename Storage::const_iterator; // must be stable
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
Handle handle = _storage.begin();
Coord coord = handle->coord;
return coord;
Handle push(treedist v) { return _storage.insert(std::move(v)); }
void increase(Handle& h, double dist) {
treedist v = *h;
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
h = push(std::move(v));
Storage _storage;
struct DistanceHeap {
using Storage = bh::d_ary_heap<treedist, //
bh::compare<treedist::Greater>, bh::mutable_<true>, bh::arity<2>>;
using Handle = Storage::handle_type;
DistanceHeap() { _storage.reserve(2056 * 2056); }
size_t size() const { return _storage.size(); }
bool empty() const { return _storage.empty(); }
Coord pop() { // extract closest
auto& temp = _storage.top();
Coord coord = temp.coord;
return coord;
Handle push(treedist v) { return _storage.push(std::move(v)); }
void increase(Handle h, double dist) {
auto& v = *h;
assert(dist <= v.dist);
v.dist = dist;
v.known += 1;
Storage _storage;
template <typename Queue, int rows = 16001, int columns = rows> //
struct Algorithm {
Queue queue; // prio queue used to efficiently store the distances
struct fmmdist {
enum State { far = 0, close = 1, known = 2 };
State state{far};
double dist{-1};
typename Queue::Handle handle{};
// phi array
Matrix<double, rows, columns> phi;
// Matrix representing state of point in fast marching method
Matrix<fmmdist, rows, columns> V;
// Spatial step
static double constexpr ddx = 1e-3;
static constexpr std::array<int, 4> dx{0, 1, 0, -1};
static constexpr std::array<int, 4> dy{1, 0, -1, 0};
// initialize phi array for testing purposes
inline void initPhi() {
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
phi[i][j] = sqrt(pow(-8 + ddx * i, 2) + pow(-8 + ddx * j, 2)) - 4;
// Initialize the state array
void initState() {
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
V[i][j] = {fmmdist::far, -1, {}};
// return largest root of quadratic with coef. a, b, c
// This assumes delta >= 0 (this holds for FMM method)
static double quadratic(double a, double b, double c) {
double delta = b * b - 4 * a * c;
if (delta > 0) {
double r1 = (-b + std::sqrt(delta)) / (2 * a);
double r2 = (-b - std::sqrt(delta)) / (2 * a);
return std::max(r1, r2);
return -b / (2 * a);
using NDist = boost::container::static_vector<double, 4>;
static double pick_distance(NDist const& ndist, unsigned configuration) {
if (ndist.size() == 1)
return ndist[0]; // case a
auto q_hv = [](double h, double v) {
return quadratic(2, 0, -(h * h * v * v) / (h * h + v * v));
switch (configuration) {
case 3: case 12:
case 6: case 9: return q_hv(ndist[0], ndist[1]); // case b
case 5: case 10: return std::min(ndist[0], ndist[1]); // case d
case 13: return q_hv(ndist[2], std::min(ndist[0], ndist[1])); // case c, both vertical
case 7: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both vertical
case 11: return q_hv(ndist[0], std::min(ndist[1], ndist[2])); // case c, both horizontal
case 14: return q_hv(ndist[1], std::min(ndist[0], ndist[2])); // case c, both horizontal
case 15: return q_hv(std::min(ndist[1], ndist[3]), std::min(ndist[0], ndist[2])); // case e
default: return 0;
// Initialization of the fast marching method
void initialization(bool inside) {
// inside means we tag the points within the interface as known
for (int i = 0; i < rows; i++)
for (int j = 0; j < columns; j++)
if (inside != (phi[i][j] >= EPS)) { // bound used to be used here...
V[i][j].state = fmmdist::known;
V[i][j].dist = 0;
// find initial close points
// see paper by James Sethian
NDist ndist;
for (int i = 0; i < rows; i++) {
for (int j = 0; j < columns; j++) {
if (V[i][j].state != fmmdist::known) {
unsigned neigh = 0; // mod 16 this represents a specific configuration
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = i + dx[k];
auto const c = j + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
if (V[r][c].state == fmmdist::known) {
neigh |= (1 << k);
using std::abs;
ndist.push_back(abs(phi[i][j]) * ddx / (abs(phi[i][j]) + abs(phi[r][c])));
if (neigh) {
double dist = pick_distance(ndist, neigh);
// add to narrow band
treedist t;
t.dist = dist;
t.coord = {i, j};
t.ts = 0;
t.known = ndist.size();
V[i][j] = {fmmdist::close, dist, queue.push(t)};
struct Coef {
double a, b, c;
double computeDist(int row, int column) {
// Determine the x-coefficients
auto closest = [](auto*... candidates) {
double dist = INFINITY;
for (auto p : {candidates...})
if (p && p->state == fmmdist::known)
dist = std::min(dist, p->dist);
return Coef{1.0, -2.0 * dist, dist * dist};
Coef x = closest( //
row + 1 < rows ? &V[row + 1][column] : nullptr, //
row > 0 ? &V[row - 1][column] : nullptr);
Coef y = closest( //
column + 1 < columns ? &V[row][column + 1] : nullptr, //
column > 0 ? &V[row][column - 1] : nullptr);
// return the largest root of the quadratic
double a = x.a + y.a;
double b = x.b + y.b;
double c = x.c + y.c - powf(ddx, 2);
double result = quadratic(a, b, c);
return result;
// Fast marching loop including simultaneous velocity extension
void loopFMVel() {
Timestamp tsclock = 1;
while (!queue.empty()) {
auto [row, column] = queue.pop();
V[row][column].state = fmmdist::known;
for (unsigned k = 0; k < dx.size(); k++) {
auto const r = row + dx[k];
auto const c = column + dy[k];
if (r >= rows || r < 0 || c >= columns || c < 0)
auto& cell = V[r][c]; // TODO better name
if (cell.state == fmmdist::known)
auto dist = computeDist(r, c);
if (cell.state == fmmdist::close) {
if (auto& old = *cell.handle; dist < old.dist) {
cell.dist = dist;
queue.increase(cell.handle, dist);
} else if (cell.state == fmmdist::far) {
treedist temp;
temp.coord = {r, c};
temp.dist = dist;
temp.known = 1;
temp.ts = tsclock;
cell.state = fmmdist::close;
cell.dist = dist;
cell.handle = queue.push(temp);
tsclock += 1;
#include <chrono>
#include <iomanip>
#include <iostream>
namespace {
static long elapsed() {
auto now = std::chrono::high_resolution_clock::now;
using namespace std::chrono_literals;
static auto start = now();
auto t = now();
return (t - std::exchange(start, t)) / 1ms;
void trace(auto const&... args) {
((std::cout << std::setw(10) << elapsed() << "ms ") << ... << args) << std::endl;
template <typename Queue> void run() {
auto a = std::make_unique<Algorithm<Queue, 5601>>();
trace("Finished state init ");
trace("Finished init of FMM ");
} // namespace
int main(int argc, char**) {
if (argc == 1) run<DistanceTree>();
if (argc >= 2) run<DistanceHeap>();
if (argc == 3) run<DistanceTree>();
Printing (online):
g++ -std=c++20 -O3 -DNDEBUG -ffast-math -Wall -pedantic -pthread main.cpp && ./a.out heap tree
0ms Start
0ms void {anonymous}::run() [with Queue = DistanceHeap]
1529ms Constructed
263ms Finished state init
228ms Finished init of FMM
10523ms Done
267ms void {anonymous}::run() [with Queue = DistanceTree]
1204ms Constructed
282ms Finished state init
225ms Finished init of FMM
8305ms Done
262ms Exit
Local demo: