Search code examples
c++boostinterprocess

What is the right way to send a serialized struct over boost.interprocess.?


I am trying to send a struct of data over shared memory, using boost interprocess. I want to serialize the data and send it as binary.

I am basing my approach on this answer:

Send complex data structure via boost message queue

However once I have switched to my struct type, my code does not send the data. Everything compiles and runs fine. However when I close and re-open the receiver application, the data no longer sends / receives. How can I do this in such a way that the receiver can be closed and opened, and will automatically pick up the connection again?

  • File datastruct.hpp

     #ifndef DATASTRUCT_HPP
     #define DATASTRUCT_HPP
    
     #include <string>
     #include <vector>
    
     #include <boost/serialization/vector.hpp>
    
     #define MAX_SIZE 150000
    
     namespace dataStruct {
    
         struct VUserPoint {
             float PositionX;
             float PositionY;
             float PositionZ;
    
             template <typename Archive> void serialize(Archive& ar, unsigned int const version) {
    
                 ar & PositionX;
                 ar & PositionY;
                 ar & PositionZ;
             }
         };
    
         struct FvpData {
    
             // Array of current points in frame
             std::vector<VUserPoint> UserPoints;
             int                     frameNumber;
    
             template <typename Archive> void serialize(Archive& ar, unsigned int const version) {
                 ar & UserPoints;
                 ar & frameNumber;
             }
         };
    
     } // namespace dataStruct
    
     #endif // DATASTRUCT_HPP
    
  • File sender.cpp

     #include "DataSender.h"
     #include <boost/archive/binary_iarchive.hpp>
     #include <boost/archive/binary_oarchive.hpp>
     #include <boost/archive/text_oarchive.hpp>
     #include <boost/interprocess/ipc/message_queue.hpp>
    
     using namespace boost::interprocess;
    
     int num = 1;
    
     void DataSender::Grab(std::vector<Eigen::Vector3d> points) {
         dataStruct::FvpData data;
         // add userpoints
         data.UserPoints.clear();
         for (int i = 0; i < points.size(); i++) {
             dataStruct::VUserPoint pnt;
    
             pnt.PositionX = points[i].x();
             pnt.PositionY = points[i].y();
             pnt.PositionZ = points[i].z();
    
             data.UserPoints.push_back(pnt);
         }
         num             += 1;
         data.frameNumber = num;
    
         try {
             message_queue mq(open_or_create, "mq", 100, MAX_SIZE);
    
             std::stringstream oss;
    
             boost::archive::binary_oarchive oa(oss);
             oa << data;
    
             std::string serialized_string(oss.str());
             mq.send(serialized_string.data(), serialized_string.size(), 1);
    
             std::cout << data.frameNumber << std::endl;
         } catch (interprocess_exception& ex) {
             std::cerr << ex.what() << std::endl;
         }
     }
    
  • File receiver.cpp

     #include <iostream>
     #include <string>
    
     #include <boost/archive/binary_iarchive.hpp>
     #include <boost/archive/binary_oarchive.hpp>
     #include <boost/archive/text_iarchive.hpp>
     #include <boost/interprocess/ipc/message_queue.hpp>
    
     #include "DataStruct.hpp"
    
     using namespace boost::interprocess;
    
     int main() {
    
         try {
             message_queue mq(open_or_create, "mq", 100, MAX_SIZE);
    
             message_queue::size_type recvd_size;
             unsigned int             priority;
    
             while (true) {
                 dataStruct::FvpData me;
    
                 std::stringstream iss;
                 std::string       serialized_string;
                 serialized_string.resize(MAX_SIZE);
                 mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
                 iss << serialized_string;
    
                 boost::archive::binary_iarchive ia(iss);
                 ia >> me;
    
                 std::cout << me.frameNumber << std::endl;
                 //   std::cout << me.name << std::endl;
             }
         } catch (interprocess_exception& ex) {
             std::cerr << ex.what() << std::endl;
         }
    
         //  message_queue::remove("mq");
     }
    

Solution

  • Firstly I simplified and extended the reproducer:

    • File datastruct.hpp

       #pragma once
       #include <boost/serialization/vector.hpp>
       static constexpr inline size_t MAX_SIZE = 150'000;
       namespace dataStruct {
           struct VUserPoint {
               float PositionX, PositionY, PositionZ;
               void serialize(auto& ar, unsigned) { ar & PositionX & PositionY & PositionZ; }
           };
      
           struct FvpData {
               std::vector<VUserPoint> UserPoints;
               int                     frameNumber;
               void serialize(auto& ar, unsigned) { ar & UserPoints & frameNumber; }
           };
       } // namespace dataStruct
      
    • File sender.cpp

       #include "datastruct.hpp"
       #include <Eigen/Core>
       #include <boost/archive/binary_iarchive.hpp>
       #include <boost/archive/binary_oarchive.hpp>
       #include <boost/archive/text_oarchive.hpp>
       #include <boost/interprocess/ipc/message_queue.hpp>
       #include <iostream>
      
       namespace bip = boost::interprocess;
      
       static int num = 1;
      
       void Grab(std::vector<Eigen::Vector3d> points) {
           dataStruct::FvpData data{{}, ++num};
           for (auto const& p : points)
               data.UserPoints.push_back(dataStruct::VUserPoint{
                   .PositionX = static_cast<float>(p.x()),
                   .PositionY = static_cast<float>(p.y()),
                   .PositionZ = static_cast<float>(p.z()),
               });
      
           try {
               bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
      
               std::stringstream oss;
      
               {
                   boost::archive::binary_oarchive oa(oss);
                   oa << data;
               } // completes the archive!
      
               std::string serialized_string(oss.str());
               mq.send(serialized_string.data(), serialized_string.size(), 1);
      
               std::cout << data.frameNumber << std::endl;
           } catch (bip::interprocess_exception const& ex) {
               std::cerr << ex.what() << std::endl;
           }
       }
      
       #include <random>
       #include <thread>
       int main() {
           std::cout << std::fixed << std::setprecision(3);
      
           std::mt19937 gen(std::random_device{}());
           std::uniform_real_distribution<double> dist(-10, 10);
      
           for (int n = 100; n--; std::this_thread::sleep_for(std::chrono::seconds(1))) {
               std::vector<Eigen::Vector3d> points;
               for (int i = 0; i < 3; ++i) {
                   auto x = dist(gen), y = dist(gen), z = dist(gen);
                   points.emplace_back(x, y, z);
                   std::cout << x << " " << y << " " << z << std::endl;
               }
      
               Grab(points);
           }
       }
      
    • File receiver.cpp

       #include "datastruct.hpp"
       #include <boost/archive/binary_iarchive.hpp>
       #include <boost/archive/binary_oarchive.hpp>
       #include <boost/archive/text_iarchive.hpp>
       #include <boost/interprocess/ipc/message_queue.hpp>
       #include <iomanip>
       #include <iostream>
       #include <sstream>
       #include <string>
      
       namespace bip = boost::interprocess;
      
       int main() {
           std::cout << std::fixed << std::setprecision(3);
      
           try {
               bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
      
               bip::message_queue::size_type recvd_size;
               unsigned int                  priority;
      
               while (true) {
                   dataStruct::FvpData me;
      
                   std::stringstream iss;
                   std::string       serialized_string;
                   serialized_string.resize(MAX_SIZE);
                   mq.receive(&serialized_string[0], MAX_SIZE, recvd_size, priority);
                   iss << serialized_string;
      
                   boost::archive::binary_iarchive ia(iss);
                   ia >> me;
      
                   std::cout << me.frameNumber << std::endl;
                   for (auto const& p : me.UserPoints)
                       std::cout << p.PositionX << " " << p.PositionY << " " << p.PositionZ << "\n";
      
               }
           } catch (bip::interprocess_exception const& ex) {
               std::cerr << ex.what() << std::endl;
           }
      
           //  message_queue::remove("mq");
       }
      

    Next up, I verified that normal operation is okay.

    The Problem: Safe Stop

    You don't say how you stop the program(s). Since there's nothing in the code shown, I might assume that you use Ctrl-C (or worse):

    As you can see you get a lock exception. You need to cleanly exit. You can implement your own graceful shutdown, or use signals to intercept Ctrl-C/termination:

    std::atomic_bool shutdown{false};
    
    boost::asio::thread_pool io{1};
    boost::asio::signal_set  sigs(io, SIGINT, SIGTERM);
    sigs.async_wait([&](boost::system::error_code const& ec, int num) {
        std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
        shutdown = true;
    });
    

    e.g. Adjusted sender.cpp

    #include "datastruct.hpp"
    #include <Eigen/Core>
    #include <boost/archive/binary_iarchive.hpp>
    #include <boost/archive/binary_oarchive.hpp>
    #include <boost/archive/text_oarchive.hpp>
    #include <boost/asio.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <iostream>
    
    namespace bip = boost::interprocess;
    
    static int num = 1;
    
    void Grab(std::vector<Eigen::Vector3d> points) {
        dataStruct::FvpData data{{}, ++num};
        for (auto const& p : points)
            data.UserPoints.push_back(dataStruct::VUserPoint{
                .PositionX = static_cast<float>(p.x()),
                .PositionY = static_cast<float>(p.y()),
                .PositionZ = static_cast<float>(p.z()),
            });
    
        try {
            bip::message_queue mq(bip::open_or_create, "mq", 100, MAX_SIZE);
    
            std::stringstream oss;
    
            {
                boost::archive::binary_oarchive oa(oss);
                oa << data;
            } // completes the archive!
    
            std::string serialized_string(oss.str());
            mq.send(serialized_string.data(), serialized_string.size(), 1);
    
            std::cout << data.frameNumber << std::endl;
        } catch (bip::interprocess_exception const& ex) {
            std::cerr << ex.what() << std::endl;
        }
    }
    
    #include <random>
    #include <thread>
    int main() {
        std::cout << std::fixed << std::setprecision(3);
    
        std::mt19937                           gen(std::random_device{}());
        std::uniform_real_distribution<double> dist(-10, 10);
    
        std::atomic_bool shutdown{false};
    
        boost::asio::thread_pool io{1};
        boost::asio::signal_set  sigs(io, SIGINT, SIGTERM);
        sigs.async_wait([&](boost::system::error_code const& ec, int num) {
            std::cerr << "Signal " << ::strsignal(num) << " (" << ec.message() << ")" << std::endl;
            shutdown = true;
        });
    
        try {
            for (; !shutdown; std::this_thread::sleep_for(std::chrono::seconds(1))) {
                std::vector<Eigen::Vector3d> points;
                for (int i = 0; i < 3; ++i) {
                    auto x = dist(gen), y = dist(gen), z = dist(gen);
                    points.emplace_back(x, y, z);
                    std::cout << x << " " << y << " " << z << std::endl;
                }
    
                Grab(points);
            }
        } catch (std::exception const& ex) {
            std::cerr << ex.what() << std::endl;
        }
    
        io.join();
    }
    

    See it live:

    Observations

    You serialize doubles as floats. That loses precision.

    You copy everything multiple times: first from Eigen to your data struct, then to a string stream, from the stream to a string then to the queue, etc.

    I'd suggest using serialization directly on the Eigen data. It seems to me it could be zero-copy:

    static_assert(std::is_standard_layout_v<Eigen::Vector3d>);
    static_assert(sizeof(Eigen::Vector3d) == 3 * sizeof(double));
    

    Sadly, Eigen's implementation is somehow not trivially-copyable (I think this may be incidental?). However you can see a lot of answers on the net (and on this site) showing how to serialize Eigen vectors and matrices directly.

    BONUS

    Here's a version that

    • avoids the double->float conversion

    • avoids unnecessary copies

    • avoids unnecessary archive overhead:

       static auto constexpr ar_flags =               
           boost::archive::archive_flags::no_header | 
           boost::archive::archive_flags::no_codecvt |
           boost::archive::archive_flags::no_tracking;
      
    • avoid deadlocking on full/empty queue by using timed_send and timed_receive

    Live On Compiler Explorer

    #include <boost/archive/binary_iarchive.hpp>
    #include <boost/archive/binary_oarchive.hpp>
    #include <boost/asio.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <boost/iostreams/device/array.hpp>
    #include <boost/iostreams/stream.hpp>
    #include <boost/serialization/is_bitwise_serializable.hpp>
    #include <boost/serialization/vector.hpp>
    
    #include <Eigen/Core>
    #include <iomanip>
    #include <iostream>
    #include <random>
    #include <thread>
    static constexpr inline size_t MAX_SIZE = 150'000;
    
    // tentative despite the types not being trivially-copyable
    // static_assert(std::is_trivially_copyable_v<Eigen::Vector3f>);
    static_assert(std::is_standard_layout_v<Eigen::Vector3f>);
    static_assert(sizeof(Eigen::Vector3f) == 3 * sizeof(float));
    BOOST_IS_BITWISE_SERIALIZABLE(Eigen::Vector3f);
    
    // static_assert(std::is_trivially_copyable_v<Eigen::Vector3d>);
    static_assert(std::is_standard_layout_v<Eigen::Vector3d>);
    static_assert(sizeof(Eigen::Vector3d) == 3 * sizeof(double));
    BOOST_IS_BITWISE_SERIALIZABLE(Eigen::Vector3d);
    
    static auto constexpr ar_flags =                //
        boost::archive::archive_flags::no_header |  //
        boost::archive::archive_flags::no_codecvt | //
        boost::archive::archive_flags::no_tracking;
    
    static std::atomic_bool g_shutdown{false};
    using namespace std::chrono_literals;
    namespace bip = boost::interprocess;
    namespace bio = boost::iostreams;
    
    static auto constexpr QUEUE_NAME = "7be6183b-de45-4c06-9fae-f1fbdcd24b66";
    bip::message_queue g_mq(bip::open_or_create, QUEUE_NAME, 10, MAX_SIZE);
    
    static auto now() { return std::chrono::high_resolution_clock::now(); }
    static constexpr auto timeres = 500ms;
    
    void receiver() {
        for (std::string buf; !g_shutdown;) {
            buf.resize(MAX_SIZE);
            bip::message_queue::size_type recvd_size;
            unsigned int                  priority;
    
            for (auto t = now() + timeres; !g_shutdown; t += timeres) {
                if (g_mq.timed_receive(&buf[0], MAX_SIZE, recvd_size, priority, t))
                    break;
            }
            buf.resize(recvd_size);
    
            bio::stream<bio::array_source>  arrstr(buf.data(), buf.size());
            boost::archive::binary_iarchive ia(arrstr, ar_flags);
    
            int                          frameNum;
            std::vector<Eigen::Vector3f> points;
    
            ia >> frameNum >> points;
    
            std::cout << frameNum << std::endl;
            for (auto const& p : points)
                std::cout << p.x() << " " << p.y() << " " << p.z() << "\n";
        }
    }
    
    void sender() {
        std::mt19937                           gen(std::random_device{}());
        std::uniform_real_distribution<double> dist(-10, 10);
        std::array<char, MAX_SIZE>             buf;
    
        for (int frameNum = 1; !g_shutdown; std::this_thread::sleep_for(timeres)) {
            std::vector<Eigen::Vector3f> points;
            for (int i = 0; i < 3; ++i) {
                auto x = dist(gen), y = dist(gen), z = dist(gen);
                points.emplace_back(x, y, z);
                std::cout << x << " " << y << " " << z << std::endl;
            }
    
            frameNum += 1;
            bio::stream<bio::array_sink> arrstr(buf.data(), buf.size());
            boost::archive::binary_oarchive(arrstr, ar_flags) << frameNum << points;
    
            for (auto t = now() + timeres; !g_shutdown; t += timeres) {
                if (g_mq.timed_send(buf.data(), arrstr.tellp(), 0, t)) {
                    std::cout << "Sent " << arrstr.tellp() << " bytes, frameNum " << frameNum << std::endl;
                    break;
                } else {
                    std::this_thread::sleep_until(t);
                    std::cout << "Waiting..." << std::endl;
                }
            }
        }
    }
    
    void on_signal(boost::system::error_code ec, int signum) {
        std::cerr << "Signal " << ::strsignal(signum) << " (" << ec.message() << ")" << std::endl;
        g_shutdown = true;
    }
    
    int main(int argc, char**) try {
        std::cout << std::fixed << std::setprecision(3);
        boost::asio::signal_set sigs(boost::asio::system_executor{}, SIGINT, SIGTERM);
        sigs.async_wait(on_signal);
    
        if (argc > 1) {
            receiver();
        } else {
            sender();
            // bip::message_queue::remove(QUEUE_NAME);
        }
    } catch (std::exception const& ex) {
        std::cerr << "Error: " << ex.what() << std::endl;
    }
    

    With a proper stress-test: