Search code examples
c++atomicstdatomicsieve-of-eratosthenes

Missing small primes in C++ atomic prime sieve


I try to develop a concurrent prime sieve implementation using C++ atomics. However, when core_count is increased, more and more small primes are missing from the output.

My guess is that the producer threads overwrite each others' results, before being read by the consumer. Even though the construction should protect against it by using the magic number 0 to indicate it's ready to accept the next prime. It seems the compare_exchange_weak is not really atomic in this case.

Things I've tried:

  • Replacing compare_exchange_weak with compare_exchange_strong
  • Changing the memory_order to anything else.
  • Swapping around the 'crossing-out' and the write.

I have tested it with Microsoft Visual Studio 2019, Clang 12.0.1 and GCC 11.1.0, but to no avail.

Any ideas on this are welcome, including some best practices I might have missed.

#include <algorithm>
#include <atomic>
#include <future>
#include <iostream>
#include <iterator>
#include <thread>
#include <vector>

int main() {
  using namespace std;

  constexpr memory_order order = memory_order_relaxed;
  atomic<int> output{0};
  vector<atomic_bool> sieve(10000);
  for (auto& each : sieve) atomic_init(&each, false);
  atomic<unsigned> finished_worker_count{0};

  auto const worker = [&output, &sieve, &finished_worker_count]() {
    for (auto current = next(sieve.begin(), 2); current != sieve.end();) {
      current = find_if(current, sieve.end(), [](atomic_bool& value) {
        bool untrue = false;
        return value.compare_exchange_strong(untrue, true, order);
      });
      if (current == sieve.end()) break;
      int const claimed = static_cast<int>(distance(sieve.begin(), current));
      int zero = 0;
      while (!output.compare_exchange_weak(zero, claimed, order))
        ;
      for (auto product = 2 * claimed; product < static_cast<int>(sieve.size());
           product += claimed)
        sieve[product].store(true, order);
    }
    finished_worker_count.fetch_add(1, order);
  };

  const auto core_count = thread::hardware_concurrency();
  vector<future<void>> futures;
  futures.reserve(core_count);
  generate_n(back_inserter(futures), core_count,
             [&worker]() { return async(worker); });
  vector<int> result;
  while (finished_worker_count < core_count) {
    auto current = output.exchange(0, order);
    if (current > 0) result.push_back(current);
  }
  sort(result.begin(), result.end());
  for (auto each : result) cout << each << " ";
  cout << '\n';
  return 0;
}

Solution

  • compare_exchange_weak will update (change) the "expected" value (the local variable zero) if the update cannot be made. This will allow overwriting one prime number with another if the main thread doesn't quickly handle the first prime.

    You'll want to reset zero back to zero before rechecking:

    while (!output.compare_exchange_weak(zero, claimed, order))
       zero = 0;