Search code examples
c++boostipc

Signal from one process to another C++


I know the title is a bit broad so let me elaborate.
I have 2 processes running, one is writing into the shared memory, the other is reading from it.
To achieve shared memory effect I am using boost::interprocess (btw let me know if there are more convenient libraries).

So I implemented the following:

//Writer

#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>

namespace ip = boost::interprocess;
class SharedMemory
{
public:
    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
    {
    }

    template<typename OpenOrCreate>
    SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
        name_(name),
        sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
    {
    }

    std::shared_ptr<ip::windows_shared_memory> getSM()
    {
        return sm_;
    }
private:
    std::function<void()> destroyer_;
    std::string name_;
    std::shared_ptr<ip::windows_shared_memory> sm_;
};


int main()
{
    SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
    ip::mapped_region region(*creator.getSM(), ip::read_write);
    std::memset(region.get_address(), 1, region.get_size());

    int status = system("reader.exe");
    std::cout << status << std::endl;
}

So I am creating shared memory, writing 1 to it then calling the reader exe. (I skip the reader part as its pretty much the same but instead of write it reads)

This code works fine, I write into memory and the other process reads it and prints my 1's.
But what if I have this 2 exes running at the same time and I want to write into memory then notify the other process that there is an update? How to signal from one exe/process to another?

The scenario is that I am streaming some live data, writing into memory and then telling the other process that there is an update.


Solution

  • I think there are more convenient approaches indeed.

    In principle to synchronize between processes you use all the same approaches as synchronizing inside a process (between threads): using synchronization primitives (mutex/critical section, condition variable, semaphores, barriers etc.).

    In addition, you need to have a data structure that you synchronize. This is precisely the Achilles' heel at the moment. There is a total absence of data structure here.

    Though you can do raw byte access with your own logic, I don't see the appeal of using a high-level library in doing so. Instead I'd use a managed memory segment, that lets you find or construct typed objects by name. This may include your synchronization primitives.

    In fact, you can expedite the process by using a message_queue which has all the synchronization already built-in.

    Manual Sync: Writer using Segment Manager

    I'll provide portable code because I donot have a windows machine. First let's think of a datastructure. A simple example would be a queue of messages. Let's use a deque<string>.

    Not exactly trivial data structures, but the great news is that Boost Interprocess comes with all the nuts and bolts to make things work (using interprocess allocators).

    namespace Shared {
    
        using Segment = ip::managed_shared_memory;
        using Mgr     = Segment::segment_manager;
        template <typename T>
        using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
        template <typename T> using Deque = bc::deque<T, Alloc<T>>;
        using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;
    
        using DataStructure = Deque<String>;
    
        class Memory {
          public:
            Memory(const char* name, size_t size)
                : name_(name)
                , sm_(ip::open_or_create, name, size)
                , data_(*sm_.find_or_construct<DataStructure>("data")(
                      sm_.get_segment_manager()))
            {
            }
    
            DataStructure&       get()       { return data_; } 
            DataStructure const& get() const { return data_; } 
    
          private:
            std::string    name_;
            Segment        sm_;
            DataStructure& data_;
        };
    
    } // namespace Shared
    

    There, now we can have the writer be something like:

    int main()
    {
        Shared::Memory creator("SharedMemory", 10*1024*1024);
    
        creator.get().emplace_back("Hello");
        creator.get().emplace_back("World");
    
        std::cout << "Total queued: " << creator.get().size() << "\n";
    }
    

    Which will print e.g.

    Total queued: 2
    Total queued: 4
    Total queued: 6
    

    Depending on the number of times you ran it.

    The Reader side

    Now lets do the reader side. In fact it's so much the same, let's put it in the same main program:

    int main(int argc, char**)
    {
        Shared::Memory mem("SharedMemory", 10*1024*1024);
        auto& data = mem.get();
    
        bool is_reader = argc > 1;
    
        if (not is_reader) {
            data.emplace_back("Hello");
            data.emplace_back("World");
            std::cout << "Total queued: " << data.size() << "\n";
        } else {
            std::cout << "Found entries: " << data.size() << "\n";
            while (!data.empty()) {
                std::cout << "Dequeued " << data.front() << "\n";
                data.pop_front();
            }
        }
    
    }
    

    Simple for a start. Now running e.g. test.exe READER will conversely print something like:

    enter image description here

    Locking & Synchronization

    The goal is to run writer and reader concurrently. That's not safe as it is now, because of a lack of locking and synchronization. Let's add it:

    class Memory {
        static constexpr size_t max_capacity = 100;
      public:
        Memory(const char* name, size_t size)
            : name_(name)
            , sm_(ip::open_or_create, name, size)
            , mx_(*sm_.find_or_construct<Mutex>("mutex")())
            , cv_(*sm_.find_or_construct<Cond>("condition")())
            , data_(*sm_.find_or_construct<DataStructure>("data")(
                  sm_.get_segment_manager()))
        { }
    
        // ... 
    
      private:
        std::string    name_;
        Segment        sm_;
        Mutex&         mx_;
        Cond&          cv_;
        DataStructure& data_;
    };
    

    Now let's be careful. Because we want all operations on the data_ queue to be synchronized, we shall not expose it as we did before (with the get() member function). Instead we expose the exact interface of operations we support:

    size_t queue_length() const;
    void enqueue(std::string message); // blocking when queue at max_capacity
    std::string dequeue();             // blocking dequeue
    std::optional<std::string> try_dequeue(); // non-blocking dequeue
    

    These all do the locking as required, simply as you'd expect:

    size_t queue_length() const {
        ip::scoped_lock<Mutex> lk(mx_);
        return data_.size();
    }
    

    It gets more interesting on the potentially blocking operations. I chose to have a maximum capacity, so enqueue needs to wait for capacity:

    // blocking when queue at max_capacity
    void enqueue(std::string message) {
        ip::scoped_lock<Mutex> lk(mx_);
        cv_.wait(lk, [this] { return data_.size() < max_capacity; });
    
        data_.emplace_back(std::move(message));
        cv_.notify_one();
    }
    

    Conversely, dequeue needs to wait for a message to become available:

    // blocking dequeue
    std::string dequeue() {
        ip::scoped_lock<Mutex> lk(mx_);
        cv_.wait(lk, [this] { return not data_.empty(); });
    
        return do_pop();
    }
    

    Alternatively, you could make it non-blocking, just optionally returning a value:

    // non-blocking dequeue
    std::optional<std::string> try_dequeue() {
        ip::scoped_lock<Mutex> lk(mx_);
    
        if (data_.empty())
            return std::nullopt;
        return do_pop();
    }
    

    Now in main let's have three versions: writer, reader and continuous reader (where the latter demonstrates the blocking interface):

    #include <boost/interprocess/allocators/allocator.hpp>
    #include <boost/interprocess/managed_shared_memory.hpp>
    #include <boost/interprocess/sync/interprocess_condition_any.hpp>
    #include <boost/interprocess/sync/interprocess_mutex.hpp>
    #include <boost/interprocess/sync/scoped_lock.hpp>
    
    #include <boost/container/scoped_allocator.hpp>
    #include <boost/interprocess/containers/deque.hpp>
    #include <boost/interprocess/containers/string.hpp>
    
    #include <iostream>
    #include <iomanip>
    #include <optional>
    
    namespace ip = boost::interprocess;
    namespace bc = boost::container;
    
    namespace Shared {
    
        using Segment = ip::managed_shared_memory;
        using Mgr     = Segment::segment_manager;
        template <typename T>
        using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
        template <typename T> using Deque = ip::deque<T, Alloc<T>>;
        using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;
    
        using DataStructure = Deque<String>;
        using Mutex         = ip::interprocess_mutex;
        using Cond          = ip::interprocess_condition;
    
        class Memory {
            static constexpr size_t max_capacity = 100;
          public:
            Memory(const char* name, size_t size)
                : name_(name)
                , sm_(ip::open_or_create, name, size)
                , mx_(*sm_.find_or_construct<Mutex>("mutex")())
                , cv_(*sm_.find_or_construct<Cond>("condition")())
                , data_(*sm_.find_or_construct<DataStructure>("data")(
                      sm_.get_segment_manager()))
            { }
    
            size_t queue_length() const {
                ip::scoped_lock<Mutex> lk(mx_);
                return data_.size(); // caution: racy by design!
            }
    
            // blocking when queue at max_capacity
            void enqueue(std::string message) {
                ip::scoped_lock<Mutex> lk(mx_);
                cv_.wait(lk, [this] { return data_.size() < max_capacity; });
    
                data_.emplace_back(std::move(message));
    
                cv_.notify_one();
            }
    
            // blocking dequeue
            std::string dequeue() {
                ip::scoped_lock<Mutex> lk(mx_);
                cv_.wait(lk, [this] { return not data_.empty(); });
    
                return do_pop();
            }
    
            // non-blocking dequeue
            std::optional<std::string> try_dequeue() {
                ip::scoped_lock<Mutex> lk(mx_);
    
                if (data_.empty())
                    return std::nullopt;
                return do_pop();
            }
    
          private:
            std::string    name_;
            Segment        sm_;
            Mutex&         mx_;
            Cond&          cv_;
            DataStructure& data_;
    
            // Assumes mx_ locked by current thread!
            std::string do_pop() {
                auto&& tmp = std::move(data_.front());
                data_.pop_front();
                cv_.notify_all(); // any of the waiters might be a/the writer
                return std::string(tmp.begin(), tmp.end());
            }
        };
    
    } // namespace Shared
    
    int main(int argc, char**)
    {
        Shared::Memory mem("SharedMemory", 10*1024*1024);
    
        switch (argc) {
        case 1:
            mem.enqueue("Hello");
            mem.enqueue("World");
            std::cout << "Total queued: " << mem.queue_length() << "\n";
            break;
        case 2:
            std::cout << "Found entries: " << mem.queue_length() << "\n";
            while (auto msg = mem.try_dequeue()) {
                std::cout << "Dequeued " << *msg << "\n";
            }
            break;
        case 3: 
            std::cout << "Continuous reader\n";
            while (true) {
                std::cout << "Dequeued " << mem.dequeue() << "\n";
            }
            break;
        }
    }
    

    Little demo:

    enter image description here

    Summary, Caution

    Note there are some loose ends with the above. Notably, the absence of robust locks in Boost Interprocess needs some extra care for proper shutdown without holding the lock.

    I'd suggest to contrast with ip::message_queue as well: