Search code examples
c++multithreadingc++11shared-ptrlibpqxx

simple thread safe vector for connections in grpc service


I'm trying to learn about concurrency, and I'm implementing a small connection pool in a grpc service that needs to make many connections to a postgres database.

I'm trying to implement a basic connectionPool to prevent creating a new connection for each request. To start, I attempted to create a thread safe std::vector. When I run the grpc server, a single transaction is made, and then the server blocks, but I can't reason out what's going on. Any help would be appreciated

class SafeVector {
    std::vector<pqxx::connection*> pool_;
    int size_;
    int max_size_;

    std::mutex m_;
    std::condition_variable cv_;
public:
    SafeVector(int size, int max_size) : size_(size), max_size_(max_size) {
        assert(size_ <= max_size_);
        for (size_t i = 0; i < size; ++i) {
            pool_.push_back(new pqxx::connection("some conn string"));
        }
    }
    SafeVector(SafeVector const&)=delete; // to be implemented
    SafeVector& operator=(SafeVector const&)=delete; // no assignment keeps things simple

    std::shared_ptr<pqxx::connection> borrow() {
        std::unique_lock<std::mutex> l(m_);
        cv_.wait(l, [this]{ return !pool_.empty(); });
        std::shared_ptr<pqxx::connection> res(pool_.back());
        pool_.pop_back();
        return res;
    }

    void surrender(std::shared_ptr<pqxx::connection> connection) {
        std::lock_guard<std::mutex> l(m_);
        pool_.push_back(connection.get());
        cv_.notify_all();
    }
};

In main, I then pass a SafeVector* s = new SafeVector(4, 10); into my service ServiceA(s)

Inside ServiceA, I use the connection as follows:

std::shared_ptr<pqxx::connection> c = safeVector_->borrow();
c->perform(SomeTransactorImpl);
safeVector_->surrender(c);

I put a bunch of logging statements everywhere, and I'm pretty sure I have a fundamental misunderstanding of the core concept of either (1) shared_ptr or (2) the various locking structures.

In particular, it seems that after 4 connections are used (the maximum number of hardware threads on my machine), a seg fault (error 11) happens when attempting to return a connection in the borrow() method.

Any help would be appreciated. Thanks.


Solution

  • smart pointers in C++ are about object ownership.

    Object ownership is about who gets to delete the object and when.

    A shared pointer means that who gets to delete and when is a shared concern. Once you have said "no one bit of code is permitted to delete this object", you cannot take it back.

    In your code, you try to take an object with shared ownership and claim it for your SafeVector in surrender. This is not permitted. You try it anyhow with a call to .get(), but the right to delete that object remains owned by shared pointers.

    They proceed to delete it (maybe right away, maybe tomorrow) and your container has a dangling pointer to a deleted object.


    Change your shared ptrs to unique ptrs. Add move as required to make it compile.

    In surrender, assert the supplied unique ptr is non-empty.

    And whike you are in there,

    cv_.notify_one();
    

    I would also

    std::vector<std::unique_ptr<pqxx::connection>> pool_;
    

    and change:

    pool_.push_back(std::move(connection));
    

    if you don't update the type of pool_, instead change .get() to .release(). Unlike shared ptr, unique ptr can give up ownership.