I know the title is a bit broad so let me elaborate.
I have 2 processes running, one is writing into the shared memory, the other is reading from it.
To achieve shared memory effect I am using boost::interprocess (btw let me know if there are more convenient libraries).
So I implemented the following:
//Writer
#include <boost/interprocess/shared_memory_object.hpp>
#include <boost/interprocess/windows_shared_memory.hpp>
#include <boost/interprocess/mapped_region.hpp>
#include <iostream>
namespace ip = boost::interprocess;
class SharedMemory
{
public:
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode, size_t size) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode, size))
{
}
template<typename OpenOrCreate>
SharedMemory(OpenOrCreate criteria, const char* name, ip::mode_t mode) :
name_(name),
sm_(std::make_shared<ip::windows_shared_memory>(criteria, name, mode))
{
}
std::shared_ptr<ip::windows_shared_memory> getSM()
{
return sm_;
}
private:
std::function<void()> destroyer_;
std::string name_;
std::shared_ptr<ip::windows_shared_memory> sm_;
};
int main()
{
SharedMemory creator(ip::create_only, "SharedMemory", ip::read_write, 10);
ip::mapped_region region(*creator.getSM(), ip::read_write);
std::memset(region.get_address(), 1, region.get_size());
int status = system("reader.exe");
std::cout << status << std::endl;
}
So I am creating shared memory, writing 1 to it then calling the reader exe. (I skip the reader part as its pretty much the same but instead of write it reads)
This code works fine, I write into memory and the other process reads it and prints my 1's.
But what if I have this 2 exes running at the same time and I want to write into memory then notify the other process that there is an update? How to signal from one exe/process to another?
The scenario is that I am streaming some live data, writing into memory and then telling the other process that there is an update.
I think there are more convenient approaches indeed.
In principle to synchronize between processes you use all the same approaches as synchronizing inside a process (between threads): using synchronization primitives (mutex/critical section, condition variable, semaphores, barriers etc.).
In addition, you need to have a data structure that you synchronize. This is precisely the Achilles' heel at the moment. There is a total absence of data structure here.
Though you can do raw byte access with your own logic, I don't see the appeal of using a high-level library in doing so. Instead I'd use a managed memory segment, that lets you find or construct typed objects by name. This may include your synchronization primitives.
In fact, you can expedite the process by using a message_queue
which has all the synchronization already built-in.
I'll provide portable code because I donot have a windows machine. First let's think of a datastructure. A simple example would be a queue of messages. Let's use a deque<string>
.
Not exactly trivial data structures, but the great news is that Boost Interprocess comes with all the nuts and bolts to make things work (using interprocess allocators).
namespace Shared {
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = bc::deque<T, Alloc<T>>;
using String = bc::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
class Memory {
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{
}
DataStructure& get() { return data_; }
DataStructure const& get() const { return data_; }
private:
std::string name_;
Segment sm_;
DataStructure& data_;
};
} // namespace Shared
There, now we can have the writer be something like:
int main()
{
Shared::Memory creator("SharedMemory", 10*1024*1024);
creator.get().emplace_back("Hello");
creator.get().emplace_back("World");
std::cout << "Total queued: " << creator.get().size() << "\n";
}
Which will print e.g.
Total queued: 2
Total queued: 4
Total queued: 6
Depending on the number of times you ran it.
Now lets do the reader side. In fact it's so much the same, let's put it in the same main program:
int main(int argc, char**)
{
Shared::Memory mem("SharedMemory", 10*1024*1024);
auto& data = mem.get();
bool is_reader = argc > 1;
if (not is_reader) {
data.emplace_back("Hello");
data.emplace_back("World");
std::cout << "Total queued: " << data.size() << "\n";
} else {
std::cout << "Found entries: " << data.size() << "\n";
while (!data.empty()) {
std::cout << "Dequeued " << data.front() << "\n";
data.pop_front();
}
}
}
Simple for a start. Now running e.g. test.exe READER
will conversely print something like:
The goal is to run writer and reader concurrently. That's not safe as it is now, because of a lack of locking and synchronization. Let's add it:
class Memory {
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{ }
// ...
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
};
Now let's be careful. Because we want all operations on the data_
queue to be synchronized, we shall not expose it as we did before (with the get()
member function). Instead we expose the exact interface of operations we support:
size_t queue_length() const;
void enqueue(std::string message); // blocking when queue at max_capacity
std::string dequeue(); // blocking dequeue
std::optional<std::string> try_dequeue(); // non-blocking dequeue
These all do the locking as required, simply as you'd expect:
size_t queue_length() const {
ip::scoped_lock<Mutex> lk(mx_);
return data_.size();
}
It gets more interesting on the potentially blocking operations. I chose to have a maximum capacity, so enqueue
needs to wait for capacity:
// blocking when queue at max_capacity
void enqueue(std::string message) {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return data_.size() < max_capacity; });
data_.emplace_back(std::move(message));
cv_.notify_one();
}
Conversely, dequeue
needs to wait for a message to become available:
// blocking dequeue
std::string dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return not data_.empty(); });
return do_pop();
}
Alternatively, you could make it non-blocking, just optionally returning a value:
// non-blocking dequeue
std::optional<std::string> try_dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
}
Now in main let's have three versions: writer, reader and continuous reader (where the latter demonstrates the blocking interface):
#include <boost/interprocess/allocators/allocator.hpp>
#include <boost/interprocess/managed_shared_memory.hpp>
#include <boost/interprocess/sync/interprocess_condition_any.hpp>
#include <boost/interprocess/sync/interprocess_mutex.hpp>
#include <boost/interprocess/sync/scoped_lock.hpp>
#include <boost/container/scoped_allocator.hpp>
#include <boost/interprocess/containers/deque.hpp>
#include <boost/interprocess/containers/string.hpp>
#include <iostream>
#include <iomanip>
#include <optional>
namespace ip = boost::interprocess;
namespace bc = boost::container;
namespace Shared {
using Segment = ip::managed_shared_memory;
using Mgr = Segment::segment_manager;
template <typename T>
using Alloc = bc::scoped_allocator_adaptor<ip::allocator<T, Mgr>>;
template <typename T> using Deque = ip::deque<T, Alloc<T>>;
using String = ip::basic_string<char, std::char_traits<char>, Alloc<char>>;
using DataStructure = Deque<String>;
using Mutex = ip::interprocess_mutex;
using Cond = ip::interprocess_condition;
class Memory {
static constexpr size_t max_capacity = 100;
public:
Memory(const char* name, size_t size)
: name_(name)
, sm_(ip::open_or_create, name, size)
, mx_(*sm_.find_or_construct<Mutex>("mutex")())
, cv_(*sm_.find_or_construct<Cond>("condition")())
, data_(*sm_.find_or_construct<DataStructure>("data")(
sm_.get_segment_manager()))
{ }
size_t queue_length() const {
ip::scoped_lock<Mutex> lk(mx_);
return data_.size(); // caution: racy by design!
}
// blocking when queue at max_capacity
void enqueue(std::string message) {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return data_.size() < max_capacity; });
data_.emplace_back(std::move(message));
cv_.notify_one();
}
// blocking dequeue
std::string dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
cv_.wait(lk, [this] { return not data_.empty(); });
return do_pop();
}
// non-blocking dequeue
std::optional<std::string> try_dequeue() {
ip::scoped_lock<Mutex> lk(mx_);
if (data_.empty())
return std::nullopt;
return do_pop();
}
private:
std::string name_;
Segment sm_;
Mutex& mx_;
Cond& cv_;
DataStructure& data_;
// Assumes mx_ locked by current thread!
std::string do_pop() {
auto&& tmp = std::move(data_.front());
data_.pop_front();
cv_.notify_all(); // any of the waiters might be a/the writer
return std::string(tmp.begin(), tmp.end());
}
};
} // namespace Shared
int main(int argc, char**)
{
Shared::Memory mem("SharedMemory", 10*1024*1024);
switch (argc) {
case 1:
mem.enqueue("Hello");
mem.enqueue("World");
std::cout << "Total queued: " << mem.queue_length() << "\n";
break;
case 2:
std::cout << "Found entries: " << mem.queue_length() << "\n";
while (auto msg = mem.try_dequeue()) {
std::cout << "Dequeued " << *msg << "\n";
}
break;
case 3:
std::cout << "Continuous reader\n";
while (true) {
std::cout << "Dequeued " << mem.dequeue() << "\n";
}
break;
}
}
Little demo:
Note there are some loose ends with the above. Notably, the absence of robust locks in Boost Interprocess needs some extra care for proper shutdown without holding the lock.
I'd suggest to contrast with ip::message_queue
as well: