Search code examples
c++boostboost-interprocess

Creating an allocator that uses multiple managed_shared_memory segments in boost


In order to get around growing a managed_shared_memory segment without unmapping and remapping all the previous regions, I want to create an allocator that creates a new managed_shared_memory segment whenever there is not enough space in the previous segments. I have looked into the boost interprocess node allocators but they don't seem like a good fit for this problem. Is there any class or utility in boost that can help with this problem?


Solution

  • Not a direct match to the question, but the relevant fruit of the comment thread, here I present an example that uses managed_external_buffer to achieve more control over on-disk format (foreseeing backwards compatible versioning and perhaps some integrity verification) and shows how to implement growth

    Live On Compiler Explorer

    #include <boost/interprocess/file_mapping.hpp>
    #include <boost/interprocess/managed_external_buffer.hpp>
    #include <boost/interprocess/mapped_region.hpp>
    
    // sample data structures:
    #include <boost/interprocess/containers/string.hpp>
    #include <boost/interprocess/containers/vector.hpp>
    
    // filesystem stuff
    #include <fcntl.h>
    #include <filesystem>
    #include <fstream>
    #include <sys/stat.h>
    
    // convenience:
    static constexpr char const* FILENAME = "data.bin";
    static auto operator""_MB(unsigned long long n) { return n << 20; }
    #include <algorithm>
    #include <boost/range/adaptor/indirected.hpp>
    #include <boost/range/algorithm/sort.hpp>
    #include <iostream>
    
    namespace bip = boost::interprocess;
    namespace fs  = std::filesystem;
    using boost::adaptors::indirected;
    
    using Segment                     = bip::managed_external_buffer;
    template <typename T> using Alloc = bip::allocator<T, Segment::segment_manager>;
    using String    = bip::basic_string<char, std::char_traits<char>, Alloc<char>>;
    using Database  = bip::vector<String, Alloc<String>>;
    using StringPtr = bip::offset_ptr<String>;
    using Index     = bip::vector<StringPtr, Alloc<StringPtr>>;
    
    struct MySharedSegment {
        MySharedSegment(bip::create_only_t, char const* filename, size_t size) {
            if (fs::exists(filename))
                throw std::runtime_error("file already exists");
            {
                std::ofstream ofs(filename, std::ios::binary | std::ios::trunc);
            }
    
            grow(filename, size);
            fm = bip::file_mapping(filename, bip::mode_t::read_write);
    
            auto offset = write_magic_header();
    
            buf = bip::mapped_region(fm, bip::mode_t::read_write, offset,
                                     size - offset);
            mb  = Segment(bip::create_only, buf.get_address(), buf.get_size());
            auto* mgr = mb.get_segment_manager();
    
            _vec   = mb.find_or_construct<Database>("vec")(mgr);
            _index = mb.find_or_construct<Index>("index")(mgr);
        }
    
        MySharedSegment(bip::open_only_t, char const* filename)
        {
            if (!fs::exists(filename))
                throw std::runtime_error("file not found");
            fm = bip::file_mapping(filename, bip::mode_t::read_write);
    
            auto offset = check_magic_header();
            buf         = bip::mapped_region(fm, bip::mode_t::read_write, offset);
    
            mb = Segment(bip::open_only, buf.get_address(), buf.get_size());
            if (buf.get_size() > mb.get_size()) {
                // also grow segment if buffer grew
                mb.grow(buf.get_size() - mb.get_size());
            }
    
            auto [v, vok] = mb.find<Database>("vec");
            auto [i, iok] = mb.find<Index>("index");
    
            if (!(v && vok && i && iok)) {
                throw std::runtime_error("an expected object was not found");
            }
    
            _vec   = v;
            _index = i;
        }
    
        static void grow(char const* filename, size_t extra) {
            fs::resize_file(filename, std::filesystem::file_size(filename) + extra);
        }
    
        Database& database() {
            assert(_vec);
            return *_vec;
        }
    
        Index& index() {
            assert(_index);
            return *_index;
        }
    
        Segment::segment_manager* get_segment_manager() {
            return mb.get_segment_manager();
        }
    
      private:
        size_t write_magic_header() {
            auto HLEN = v1_magic_header.size();
    
            if (fs::file_size(fm.get_name()) < HLEN)
                throw std::runtime_error("File short");
    
            bip::mapped_region mr(fm, bip::mode_t::read_write, 0, HLEN);
    
            auto out = reinterpret_cast<uint8_t*>(mr.get_address());
            auto nxt =
                std::copy(v1_magic_header.begin(), v1_magic_header.end(), out);
            assert(size_t(nxt - out) == HLEN);
            return HLEN;
        }
    
        size_t check_magic_header() {
            auto HLEN = v1_magic_header.size();
    
            if (fs::file_size(fm.get_name()) >= HLEN) {
                bip::mapped_region mr(fm, bip::mode_t::read_only, 0, HLEN);
    
                if (std::equal(
                        v1_magic_header.begin(), v1_magic_header.end(),
                        reinterpret_cast<uint8_t const*>(mr.get_address()))) {
                    return HLEN;
                }
            }
            // TODO future adds newer versions with different on disk formats
            throw std::runtime_error("Unknown database file format");
        }
    
        bip::file_mapping  fm;
        bip::mapped_region buf;
        Segment            mb;
    
        Database* _vec   = nullptr;
        Index*    _index = nullptr;
    
        static constexpr std::array<uint8_t, 16> v1_magic_header = {
            0x27, 0x65, 0xb6, 0xcb, 0x3a, 0x86, 0xf5, 0x48,
            0xba, 0xa3, 0x2c, 0x49, 0x00, 0xdd, 0x6f, 0xde,
        };
    };
    
    void create_initial(int size) {
        MySharedSegment mss(bip::create_only, FILENAME, size);
    
        auto* mgr = mss.get_segment_manager();
        auto& db  = mss.database();
    
        db.emplace_back("one", mgr);
        db.emplace_back("two", mgr);
        db.emplace_back("three", mgr);
    
        auto& index = mss.index();
        for (auto& elem : db) {
            index.emplace_back(&elem);
        }
    
        boost::sort(index | indirected);
    
        for (auto el : index | indirected) {
            std::cout << el << " ";
        }
        std::cout << "\n";
    }
    
    void offline_grow_with(int size) { MySharedSegment::grow(FILENAME, size); }
    
    void reopen_and_verify() {
        MySharedSegment mss(bip::open_only, FILENAME);
    
        // none of the pointers in the index have become invalidated:
        for (auto el : mss.index() | indirected) {
            std::cout << el << " ";
        }
        std::cout << "\n";
    }
    
    int main()
    {
        std::remove(FILENAME);
    
        create_initial(1_MB);
    
        offline_grow_with(1_MB);
    
        reopen_and_verify();
    }
    

    Prints

    one three two 
    one three two 
    

    Notes

    The above still requires/assumes offline growth. You can probably add an interprocess shared mutex in the control headers (outside the managed segment buffer) and use it to do reader-writer locking so other parties automatically unmap the segment when growth is requested.