I try to develop a concurrent prime sieve implementation using C++ atomics. However, when core_count
is increased, more and more small primes are missing from the output.
My guess is that the producer threads overwrite each others' results, before being read by the consumer. Even though the construction should protect against it by using the magic number 0
to indicate it's ready to accept the next prime. It seems the compare_exchange_weak
is not really atomic in this case.
Things I've tried:
compare_exchange_weak
with compare_exchange_strong
memory_order
to anything else.I have tested it with Microsoft Visual Studio 2019, Clang 12.0.1 and GCC 11.1.0, but to no avail.
Any ideas on this are welcome, including some best practices I might have missed.
#include <algorithm>
#include <atomic>
#include <future>
#include <iostream>
#include <iterator>
#include <thread>
#include <vector>
int main() {
using namespace std;
constexpr memory_order order = memory_order_relaxed;
atomic<int> output{0};
vector<atomic_bool> sieve(10000);
for (auto& each : sieve) atomic_init(&each, false);
atomic<unsigned> finished_worker_count{0};
auto const worker = [&output, &sieve, &finished_worker_count]() {
for (auto current = next(sieve.begin(), 2); current != sieve.end();) {
current = find_if(current, sieve.end(), [](atomic_bool& value) {
bool untrue = false;
return value.compare_exchange_strong(untrue, true, order);
});
if (current == sieve.end()) break;
int const claimed = static_cast<int>(distance(sieve.begin(), current));
int zero = 0;
while (!output.compare_exchange_weak(zero, claimed, order))
;
for (auto product = 2 * claimed; product < static_cast<int>(sieve.size());
product += claimed)
sieve[product].store(true, order);
}
finished_worker_count.fetch_add(1, order);
};
const auto core_count = thread::hardware_concurrency();
vector<future<void>> futures;
futures.reserve(core_count);
generate_n(back_inserter(futures), core_count,
[&worker]() { return async(worker); });
vector<int> result;
while (finished_worker_count < core_count) {
auto current = output.exchange(0, order);
if (current > 0) result.push_back(current);
}
sort(result.begin(), result.end());
for (auto each : result) cout << each << " ";
cout << '\n';
return 0;
}
compare_exchange_weak
will update (change) the "expected" value (the local variable zero
) if the update cannot be made. This will allow overwriting one prime number with another if the main thread doesn't quickly handle the first prime.
You'll want to reset zero
back to zero before rechecking:
while (!output.compare_exchange_weak(zero, claimed, order))
zero = 0;