Search code examples
c++googletest

C++ google test fail for multi threads


I am having following C++ code and corresponding unit tests in google test. I am studying book on Modern C++ programming using Test Driven development. Below code is crashing in scale tests. Problem to my analysis is updated function part of code, here if I commented like shown below for lock mutex and notify_all, and reduced number of users to 50 for example instead of 5000 test case is not crashing. Another observations same code is working in another test case "HandlesLargeNumbersOfUsers" with single or multiple threads. your inputs will help me in moving further.

void updated(const User& user) override {
        // unique_lock<std::mutex> lock(mutex_);
        Count++;
        // wasExecuted_.notify_all();
    }

enter image description here

class GeoServerUsersInBoxTests : public testing::Test {
public:
    GeoServer server;

    const double TenMeters{ 10 };
    const double Width{ 2000 + TenMeters };
    const double Height{ 4000 + TenMeters };
    const string aUser{ "auser" };
    const string bUser{ "buser" };
    const string cUser{ "cuser" };

    Location aUserLocation{ 38, -103 };

    shared_ptr<ThreadPool> pool;

    virtual void SetUp() override {
        server.useThreadPool(pool);

        server.track(aUser);
        server.track(bUser);
        server.track(cUser);

        server.updateLocation(aUser, aUserLocation);
    }

    string userName(unsigned int i) {
        return string{ "user" + to_string(i) };
    }

    void addUsersAt(unsigned int number, const Location& location) {
        for (unsigned int i{ 0 }; i < number; i++) {
            string user = userName(i);
            server.track(user);
            server.updateLocation(user, location);
        }
    }
};

class AGeoServer_ScaleTests : public GeoServerUsersInBoxTests {

public:

    class GeoServerCountingListener : public GeoServerListener {
    public:
        void updated(const User& user) override {
            unique_lock<std::mutex> lock(mutex_);
            Count++;
            wasExecuted_.notify_all();
        }

        void waitForCountAndFailOnTimeout(unsigned int expectedCount,
            const milliseconds& time = milliseconds(10000)) {
            unique_lock<mutex> lock(mutex_);
            ASSERT_TRUE(wasExecuted_.wait_for(lock, time, [&]
                { return expectedCount == Count; }));
        }

        condition_variable wasExecuted_;
        unsigned int Count{ 0 };
        mutex mutex_;
    };

    GeoServerCountingListener countingListener;
    shared_ptr<thread> t;

    void SetUp() override {
        pool = make_shared<ThreadPool>();
        GeoServerUsersInBoxTests::SetUp();
    }

    void TearDown() override {
        t->join();
    }
};


TEST_F(AGeoServer_ScaleTests, HandlesLargeNumbersOfUsers) {
    pool->start(4);
    const unsigned int lots{ 5000 };
    addUsersAt(lots, Location{ aUserLocation.go(TenMeters, West) });

    t = make_shared<thread>(
        [&] { server.usersInBox(aUser, Width, Height, &countingListener); });

    countingListener.waitForCountAndFailOnTimeout(lots);
}

ThreadPool.h

class ThreadPool {
public:
    virtual ~ThreadPool() {
        stop();
    }

    void stop() {
        done_ = true;
        for (auto& thread : threads_) thread.join();
    }

    void start(unsigned int numberOfThreads = 1) {
        for (unsigned int i{ 0u }; i < numberOfThreads; i++)
            threads_.push_back(std::thread(&ThreadPool::worker, this));
    }

    bool hasWork() {
        std::lock_guard<std::mutex> block(mutex_);
        return !workQueue_.empty();
    }

    virtual void add(Work work) {
        std::lock_guard<std::mutex> block(mutex_);
        workQueue_.push_front(work);
    }

    Work pullWork() {
        std::lock_guard<std::mutex> block(mutex_);

        if (workQueue_.empty()) return Work{};

        auto work = workQueue_.back();
        workQueue_.pop_back();
        return work;
    }

private:
    void worker() {
        while (!done_) {
            while (!done_ && !hasWork())
                ;
            if (done_) break;
            pullWork().execute();
        }
    }

    std::atomic<bool> done_{ false };
    std::deque<Work> workQueue_;
    std::shared_ptr<std::thread> workThread_;
    std::mutex mutex_;
    std::vector<std::thread> threads_;
};

GeoServer.cpp

void GeoServer::track(const string& user) {
    locations_[user] = Location();
}

void GeoServer::stopTracking(const string& user) {
    locations_.erase(user);
}

bool GeoServer::isTracking(const string& user) const {
    return find(user) != locations_.end();
}

void GeoServer::updateLocation(const string& user, const Location& location) {
    locations_[user] = location;
}

Location GeoServer::locationOf(const string& user) const {
    if (!isTracking(user)) return Location{}; // TODO performance cost?

    return find(user)->second;
}

std::unordered_map<std::string, Location>::const_iterator
GeoServer::find(const std::string& user) const {
    return locations_.find(user);
}

bool GeoServer::isDifferentUserInBounds(
    const pair<string, Location>& each,
    const string& user,
    const Area& box) const {
    if (each.first == user) return false;
    return box.inBounds(each.second);
}

void GeoServer::usersInBox(
    const string& user, double widthInMeters, double heightInMeters,
    GeoServerListener* listener) const {
    auto location = locations_.find(user)->second;
    Area box{ location, widthInMeters, heightInMeters };

    for (auto& each : locations_) {
        Work work{ [&] {
           if (isDifferentUserInBounds(each, user, box))
              listener->updated(User{each.first, each.second});
        } };
        pool_->add(work);
    }
}

Solution

  • Let's look at this code:

    void GeoServer::usersInBox(
        const string& user, double widthInMeters, double heightInMeters, // #4
        GeoServerListener* listener) const { // #5
        auto location = locations_.find(user)->second;
        Area box{ location, widthInMeters, heightInMeters }; // #3
    
        for (auto& each : locations_) {
            Work work{ [&] { // #1
               if (isDifferentUserInBounds(each, user, box))
                  listener->updated(User{each.first, each.second});
            } };
            pool_->add(work); // #2
        }
    }
    

    Line #1 creates a work item. The work is a lambda function with a universal reference capture (look it up if necessary). The lambda function references four things from the outer scope: each, user, box and listener.

    Line #2 schedules the work item in the pool. Then execution continues; the work will be executed some time in the future.

    When the for loop is done, the function returns. Now things go very wrong.

    The captured box is a reference to a local variable defined at #3. This local variable disappears; the reference in the lambda is now dangling, and accessing it is undefined behavior.

    The captured listener is a reference to a function argument defined at #5. This argument disappears; the reference is now dangling. You may think, "oh, but that's a pointer, it's fine!" It's not. You're not capturing the pointer, you're capturing a reference to the pointer.

    The captured user is itself already a reference, defined at #4. While the argument disappears, the underlying thing doesn't - yet. In your test, it so happens that the argument references aUser, which is a member of the test class and so sticks around. But it could have been a temporary object that is destroyed as soon as the method returns, leaving the captured reference dangling.

    Rule of thumb: do not use & for lambdas that you can't guarantee will execute before you leave the lambda's surrounding scope.