c++performanceoptimizationk-meansbranchless

# Branchless K-means (or other optimizations)

Note: I'd appreciate more of a guide to how to approach and come up with these kinds of solutions rather than the solution itself.

I have a very performance-critical function in my system showing up as a number one profiling hotspot in specific contexts. It's in the middle of a k-means iteration (already multi-threaded using a parallel for processing sub-ranges of points in each worker thread).

``````ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i=0; i < num_centroids; ++i)
{
const ClusterCentroid& cent = centroids[i];
const float dist = ...;
if (dist < pt.min_dist) // <-- #1 hotspot
{
pt.min_dist = dist;
pt.min_index = i;
}
}
``````

Any savings in the time required to process this section of code counts substantially, so I've often been fiddling with it a lot. It might be worth putting the centroid loop outside, for example, and iterate through the points in parallel for a given centroid. The number of cluster points here spans in the millions, while the number of centroids spans in the thousands. The algorithm is applied for a handful of iterations (often under 10). It doesn't seek perfect convergence/stability, just some 'reasonable' approximation.

Any ideas are appreciated, but what I'm really eager to discover is if this code can be made branchless as it would allow for a SIMD version. I haven't really developed the kind of mental ability to easily grasp how to come up with branchless solutions: my brain fails there much like it did when I was first exposed to recursion in the early days, so a guide on how to write branchless code and how to develop the appropriate mindset for it would also be helpful.

In short, I'm looking for any guides and hints and suggestions (not necessarily solutions) on how to micro-optimize this code. It most likely has room for algorithmic improvements, but my blindspot has always been in micro-optimization solutions (and I'm curious to learn how to apply them more effectively without going overboard with it). It's already tightly multithreaded with chunky parallel for logic, so I'm pretty much pushed into the micro-optimization corner as one of the quicker things to try without a smarter algorithm outright. We're completely free to change the memory layout.

## In Response to Algorithmic Suggestions

About looking at this all wrong in seeking to micro-optimize an O(knm) algorithm which could clearly be improved at the algorithmic level, I wholeheartedly agree. This pushes this specific question into a somewhat academic and impractical realm. However, if I could be allowed an anecdote, I come from an original background of high-level programming -- big emphasis on broad, large-scale viewpoint, safety, and very little on the low-level implementation details. I've recently switched projects to a very different kind of modern-flavored one and I'm learning all kinds of new tricks from my peers of cache efficiency, GPGPU, branchless techniques, SIMD, special-purpose mem allocators that actually outperform malloc (but for specific scenarios), etc.

It's where I'm trying to catch up with the latest performance trends, and surprisingly I've found that those old data structures I often favored during the 90s which were often linked/tree-type structures are actually being vastly outperformed by much more naive, brutish, micro-optimized, parallelized code applying tuned instructions over contiguous memory blocks. It's somewhat disappointing at the same time since I feel like we're fitting the algorithms more to the machine now and narrowing the possibilities this way (especially with GPGPU).

The funniest thing is that I find this type of micro-optimized, fast array-processing code much easier to maintain than the sophisticated algorithms and data structures I was using before. For a start, they're easier to generalize. Furthermore, my peers can often take a customer complaint about a specific slowdown in an area, just slap a parallel for and possibly some SIMD and call it done with a decent speed up. Algorithmic improvements can often offer substantially more, but the speed and non-intrusiveness at which these micro-optimizations can be applied has me wanting to learn more in that area, as reading papers on better algorithms can take some time (as well as require more extensive changes). So I've been jumping on that micro-optimization bandwagon a bit more lately, and perhaps a little too much in this specific case, but my curiosity is more about expanding my range of possible solutions for any scenario.

## Disassembly

Note: I am really, really bad at assembly so I have often tuned things more in a trial and error kind of way, coming up with somewhat educated guesses about why a hotspot shown in vtune might be the bottleneck and then trying things out to see if the times improve, assuming the guesses have some hint of truth if the times do improve, or completely missed the mark if they don't.

``````000007FEEE3FB8A1  jl          thread_partition+70h (7FEEE3FB780h)
{
ClusterPoint& pt = points[j];
pt.min_index = -1;
pt.min_dist = numeric_limits<float>::max();
for (int i = 0; i < num_centroids; ++i)
000007FEEE3FB8A7  cmp         ecx,r10d
000007FEEE3FB8AC  lea         rax,[rbx+rbx*2]
000007FEEE3FB8B3  lea         r8,[rbp+rax*8+8]
{
const ClusterCentroid& cent = centroids[i];
const float x = pt.pos[0] - cent.pos[0];
const float y = pt.pos[1] - cent.pos[1];
000007FEEE3FB8B8  movss       xmm0,dword ptr [rdx]
const float z = pt.pos[2] - cent.pos[2];
000007FEEE3FB8BC  movss       xmm2,dword ptr [rdx+4]
000007FEEE3FB8C1  movss       xmm1,dword ptr [rdx-4]
000007FEEE3FB8C6  subss       xmm2,dword ptr [r8]
000007FEEE3FB8CB  subss       xmm0,dword ptr [r8-4]
000007FEEE3FB8D1  subss       xmm1,dword ptr [r8-8]
const float dist = x*x + y*y + z*z;
000007FEEE3FB8D7  mulss       xmm2,xmm2
000007FEEE3FB8DB  mulss       xmm0,xmm0
000007FEEE3FB8DF  mulss       xmm1,xmm1

if (dist < pt.min_dist)
// VTUNE HOTSPOT
000007FEEE3FB8EB  comiss      xmm2,dword ptr [rdx-8]
{
pt.min_dist = dist;
000007FEEE3FB8F1  movss       dword ptr [rdx-8],xmm2
pt.min_index = i;
000007FEEE3FB8F6  mov         dword ptr [rdx-10h],ecx
000007FEEE3FB8F9  inc         ecx
000007FEEE3FB8FF  cmp         ecx,r10d
for (int j = *irange.first; j < *irange.last; ++j)
000007FEEE3FB904  inc         edi
000007FEEE3FB90A  cmp         edi,dword ptr [rsi+4]
000007FEEE3FB913  mov         rbx,qword ptr [irange]
}
}
}
}
``````

We're forced into targeting SSE 2 -- a bit behind on our times, but the user base actually tripped up once when we assumed that even SSE 4 was okay as a min requirement (the user had some prototype Intel machine).

## Update with Standalone Test: ~5.6 secs

I'm very appreciative of all the help being offered! Because the codebase is quite extensive and the conditions for triggering that code are complex (system events triggered across multiple threads), it's a bit unwieldy to make experimental changes and profile them each time. So I've set up a superficial test on the side as a standalone application that others can also run and try out so that I can experiment with all these graciously offered solutions.

``````#define _SECURE_SCL 0
#include <iostream>
#include <fstream>
#include <vector>
#include <limits>
#include <ctime>
#if defined(_MSC_VER)
#define ALIGN16 __declspec(align(16))
#else
#include <malloc.h>
#define ALIGN16 __attribute__((aligned(16)))
#endif

using namespace std;

// Aligned memory allocation (for SIMD).
static void* malloc16(size_t amount)
{
#ifdef _MSC_VER
return _aligned_malloc(amount, 16);
#else
void* mem = 0;
posix_memalign(&mem, 16, amount);
return mem;
#endif
}
template <class T>
static T* malloc16_t(size_t num_elements)
{
return static_cast<T*>(malloc16(num_elements * sizeof(T)));
}

// Aligned free.
static void free16(void* mem)
{
#ifdef _MSC_VER
return _aligned_free(mem);
#else
free(mem);
#endif
}

// Test parameters.
enum {num_centroids = 512};
enum {num_points = num_centroids * 2000};
enum {num_iterations = 5};
static const float range = 10.0f;

class Points
{
public:
Points(): data(malloc16_t<Point>(num_points))
{
for (int p=0; p < num_points; ++p)
{
const float xyz[3] =
{
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX,
range * static_cast<float>(rand()) / RAND_MAX
};
init(p, xyz);
}
}
~Points()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].centroid = -1;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void associate(int n, int new_centroid)
{
data[n].centroid = new_centroid;
}
int centroid(int n) const
{
return data[n].centroid;
}
float* operator[](int n)
{
return data[n].xyz;
}

private:
Points(const Points&);
Points& operator=(const Points&);
struct Point
{
int centroid;
float xyz[3];
};
Point* data;
};

class Centroids
{
public:
Centroids(Points& points): data(malloc16_t<Centroid>(num_centroids))
{
// Naive initial selection algorithm, but outside the
// current area of interest.
for (int c=0; c < num_centroids; ++c)
init(c, points[c]);
}
~Centroids()
{
free16(data);
}
void init(int n, const float* xyz)
{
data[n].count = 0;
data[n].xyz[0] = xyz[0];
data[n].xyz[1] = xyz[1];
data[n].xyz[2] = xyz[2];
}
void reset(int n)
{
data[n].count = 0;
data[n].xyz[0] = 0.0f;
data[n].xyz[1] = 0.0f;
data[n].xyz[2] = 0.0f;
}
void sum(int n, const float* pt_xyz)
{
data[n].xyz[0] += pt_xyz[0];
data[n].xyz[1] += pt_xyz[1];
data[n].xyz[2] += pt_xyz[2];
++data[n].count;
}
void average(int n)
{
if (data[n].count > 0)
{
const float inv_count = 1.0f / data[n].count;
data[n].xyz[0] *= inv_count;
data[n].xyz[1] *= inv_count;
data[n].xyz[2] *= inv_count;
}
}
float* operator[](int n)
{
return data[n].xyz;
}
int find_nearest(const float* pt_xyz) const
{
float min_dist_squared = numeric_limits<float>::max();
int min_centroid = -1;
for (int c=0; c < num_centroids; ++c)
{
const float* cen_xyz = data[c].xyz;
const float x = pt_xyz[0] - cen_xyz[0];
const float y = pt_xyz[1] - cen_xyz[1];
const float z = pt_xyz[2] - cen_xyz[2];
const float dist_squared = x*x + y*y * z*z;

if (min_dist_squared > dist_squared)
{
min_dist_squared = dist_squared;
min_centroid = c;
}
}
return min_centroid;
}

private:
Centroids(const Centroids&);
Centroids& operator=(const Centroids&);
struct Centroid
{
int count;
float xyz[3];
};
Centroid* data;
};

// A high-precision real timer would be nice, but we lack C++11 and
// the coarseness of the testing here should allow this to suffice.
static double sys_time()
{
return static_cast<double>(clock()) / CLOCKS_PER_SEC;
}

static void k_means(Points& points, Centroids& centroids)
{
// Find the closest centroid for each point.
for (int p=0; p < num_points; ++p)
{
const float* pt_xyz = points[p];
points.associate(p, centroids.find_nearest(pt_xyz));
}

// Reset the data of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.reset(c);

// Compute new position sum of each centroid.
for (int p=0; p < num_points; ++p)
centroids.sum(points.centroid(p), points[p]);

// Compute average position of each centroid.
for (int c=0; c < num_centroids; ++c)
centroids.average(c);
}

int main()
{
Points points;
Centroids centroids(points);

cout << "Starting simulation..." << endl;
double start_time = sys_time();
for (int i=0; i < num_iterations; ++i)
k_means(points, centroids);
cout << "Time passed: " << (sys_time() - start_time) << " secs" << endl;
cout << "# Points: " << num_points << endl;
cout << "# Centroids: " << num_centroids << endl;

// Write the centroids to a file to give us some crude verification
// of consistency as we make changes.
ofstream out("centroids.txt");
for (int c=0; c < num_centroids; ++c)
out << "Centroid " << c << ": " << centroids[c][0] << "," << centroids[c][1] << "," << centroids[c][2] << endl;
}
``````

I'm aware of the dangers of superficial testing, but since it's already deemed to be a hotspot from previous real-world sessions, I hope it's excusable. I'm also just interested in the general techniques associated with micro-optimizing such code.

I did get slightly different results in profiling this one. The times are a bit more evenly dispersed within the loop here, and I'm not sure why. Perhaps it's because the data is smaller (I omitted members and hoisted out the `min_dist` member and made it a local variable). The exact ratio between centroids to points is also a bit different, but hopefully close enough to translate improvements here to the original code. It's also single-threaded in this superficial test, and the disassembly looks quite different so I may be risking optimizing this superficial test without the original (a risk I'm willing to take for now, as I'm more interested in expanding my knowledge of techniques that could optimize these cases rather than a solution for this exact case).

## Update with Yochai Timmer's Suggestion -- ~12.5 secs

Oh, I face the woes of micro-optimization without understanding assembly very well. I replaced this:

``````        -if (min_dist_squared > dist_squared)
-{
-    min_dist_squared = dist_squared;
-    pt.centroid = c;
-}
``````

With this:

``````        +const bool found_closer = min_dist_squared > dist_squared;
+pt.centroid = bitselect(found_closer, c, pt.centroid);
+min_dist_squared = bitselect(found_closer, dist_squared, min_dist_squared);
``````

.. only to find the times escalated from ~5.6 secs to ~12.5 secs. Nevertheless, that is not his fault nor does it take away from the value of his solution -- that's mine for failing to understand what's really going on at the machine level and taking stabs in the dark. That one apparently missed, and apparently I was not the victim of branch misprediction as I initially thought. Nevertheless, his proposed solution is a wonderful and generalized function to try in such cases, and I'm grateful to add it to my toolbox of tips and tricks. Now for round 2.

## Harold's SIMD Solution - 2.496 secs (see caveat)

This solution might be amazing. After converting the cluster rep to SoA, I'm getting times of ~2.5 seconds with this one! Unfortunately, there appears to be a glitch of some sort. I'm getting very different results for the final output that suggests more than slight precision differences, including some centroids towards the end with values of 0 (implying that they were not found in the search). I've been trying to go through the SIMD logic with the debugger to see what might be up -- it could merely be a transcription error on my part, but here's the code in case someone could spot the error.

If the error could be corrected without slowing down the results, this speed improvement is more than I ever imagined from a pure micro-optimization!

``````    // New version of Centroids::find_nearest (from harold's solution):
int find_nearest(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[0]), _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_xyz[1]), _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_xyz[2]), _mm_load_ps(cen_z + i));
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
min_dist = _mm_min_ps(min_dist, dist);
}

ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);

float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
``````

## Harold's SIMD Solution (Corrected) - ~2.5 secs

After applying the corrections and testing them out, the results are intact and function correctly with similar improvements to the original codebase!

Since this hits the holy grail of knowledge I was seeking to understand better (branchless SIMD), I'm going to award the solution with some extra props for more than doubling the speed of the operation. I have my homework cut out in trying to understand it, since my goal was not merely to mitigate this hotspot, but to expand on my personal understanding of possible solutions to deal with them.

Nevertheless, I'm grateful for all the contributions here from the algorithmic suggestions to the really cool `bitselect` trick! I wish I could accept all the answers. I may end up trying all of them at some point, but for now I have my homework cut out in understanding some of these non-arithmetical SIMD ops.

``````int find_nearest_simd(const float* pt_xyz) const
{
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
__m128 pt_xxxx = _mm_set1_ps(pt_xyz[0]);
__m128 pt_yyyy = _mm_set1_ps(pt_xyz[1]);
__m128 pt_zzzz = _mm_set1_ps(pt_xyz[2]);

_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i=4; i < num_centroids; i += 4)
{
xdif = _mm_sub_ps(pt_xxxx, _mm_load_ps(cen_x + i));
ydif = _mm_sub_ps(pt_yyyy, _mm_load_ps(cen_y + i));
zdif = _mm_sub_ps(pt_zzzz, _mm_load_ps(cen_z + i));
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
min_dist = _mm_min_ps(min_dist, dist);
}

ALIGN16 float mdist[4];
ALIGN16 uint32_t mindex[4];
_mm_store_ps(mdist, min_dist);
_mm_store_si128((__m128i*)mindex, min_index);

float closest = mdist[0];
int closest_i = mindex[0];
for (int i=1; i < 4; i++)
{
if (mdist[i] < closest)
{
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
``````

Solution

• Too bad we can't use SSE4.1, but very well then, SSE2 it is. I haven't tested this, just compiled it to see if there were syntax errors and to see whether the assembly made sense (it's mostly alright, though GCC spills `min_index` even with some `xmm` registers not used, not sure why that happens)

``````int find_closest(float *x, float *y, float *z,
float pt_x, float pt_y, float pt_z, int n) {
__m128i min_index = _mm_set_epi32(3, 2, 1, 0);
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
__m128i index = min_index;
for (int i = 4; i < n; i += 4) {
xdif = _mm_sub_ps(_mm_set1_ps(pt_x), _mm_load_ps(x + i));
ydif = _mm_sub_ps(_mm_set1_ps(pt_y), _mm_load_ps(y + i));
zdif = _mm_sub_ps(_mm_set1_ps(pt_z), _mm_load_ps(z + i));
_mm_mul_ps(ydif, ydif)),
_mm_mul_ps(zdif, zdif));
min_dist = _mm_min_ps(min_dist, dist);
}
float mdist[4];
_mm_store_ps(mdist, min_dist);
uint32_t mindex[4];
_mm_store_si128((__m128i*)mindex, min_index);
float closest = mdist[0];
int closest_i = mindex[0];
for (int i = 1; i < 4; i++) {
if (mdist[i] < closest) {
closest = mdist[i];
closest_i = mindex[i];
}
}
return closest_i;
}
``````

As usual, it expects the pointers to be 16-aligned. Also, the padding should be with points at infinity (so they're never closest to the target).

SSE 4.1 would let you replace this

``````min_index = _mm_or_si128(_mm_and_si128(index, mask),
``````

By this

``````min_index = _mm_blendv_epi8(min_index, index, mask);
``````

Here's an asm version, made for vsyasm, tested a bit (seems to work)

``````bits 64

section .data

align 16
centroid_four:
dd 4, 4, 4, 4
centroid_index:
dd 0, 1, 2, 3

section .text

global find_closest

proc_frame find_closest
;
;   arguments:
;       ecx: number of points (multiple of 4 and at least 4)
;       rdx -> array of 3 pointers to floats (x, y, z) (the points)
;       r8 -> array of 3 floats (the reference point)
;
alloc_stack 0x58
save_xmm128 xmm6, 0
save_xmm128 xmm7, 16
save_xmm128 xmm8, 32
save_xmm128 xmm9, 48
[endprolog]
movss xmm0, [r8]
shufps xmm0, xmm0, 0
movss xmm1, [r8 + 4]
shufps xmm1, xmm1, 0
movss xmm2, [r8 + 8]
shufps xmm2, xmm2, 0
; pointers to x, y, z in r8, r9, r10
mov r8, [rdx]
mov r9, [rdx + 8]
mov r10, [rdx + 16]
; reference point is in xmm0, xmm1, xmm2 (x, y, z)
movdqa xmm3, [rel centroid_index]   ; min_index
movdqa xmm4, xmm3                   ; current index
movdqa xmm9, [rel centroid_four]     ; index increment
; calculate initial min_dist, xmm5
movaps xmm5, [r8]
subps xmm5, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm5, xmm5
mulps xmm7, xmm7
mulps xmm8, xmm8
sub ecx, 4
jna _tail
_loop:
movaps xmm6, [r8]
subps xmm6, xmm0
movaps xmm7, [r9]
subps xmm7, xmm1
movaps xmm8, [r10]
subps xmm8, xmm2
mulps xmm6, xmm6
mulps xmm7, xmm7
mulps xmm8, xmm8
movaps xmm7, xmm6
cmpps xmm6, xmm5, 1
minps xmm5, xmm7
movdqa xmm7, xmm6
pand xmm6, xmm4
pandn xmm7, xmm3
por xmm6, xmm7
movdqa xmm3, xmm6
sub ecx, 4
ja _loop
_tail:
; calculate horizontal minumum
pshufd xmm0, xmm5, 0xB1
minps xmm0, xmm5
pshufd xmm1, xmm0, 0x4E
minps xmm0, xmm1
; find index of the minimum
cmpps xmm0, xmm5, 0
movmskps eax, xmm0
bsf eax, eax
; index into xmm3, sort of
movaps [rsp + 64], xmm3
mov eax, [rsp + 64 + rax * 4]
movaps xmm9, [rsp + 48]
movaps xmm8, [rsp + 32]
movaps xmm7, [rsp + 16]
movaps xmm6, [rsp]
``````extern "C" int find_closest(int n, float** points, float* reference_point);