Search code examples
boostboost-interprocess

boost::interprocess::message_queue has to be created by the process which writes to it?


I have two processes using the same code to communicate with each other depending on boost 1.58.0's boost::interprocess::message_queue.

typedef boost::shared_ptr<boost::interprocess::message_queue> mq_ptr;

mq_ptr m_t2b_queue;
mq_ptr m_b2t_queue;


sink::sink(const char * conn_id, bool is_parent)
{
    const int MAX_MESSAGE_NUMBER = 100000;


    snprintf( m_t2b_queue_name, MAX_MQ_NAME_LEN, "%s_t2b", conn_id);
    snprintf( m_b2t_queue_name, MAX_MQ_NAME_LEN, "%s_b2t", conn_id);

    if( is_parent )
    {
        message_queue::remove("ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/);
        message_queue::remove("ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/);

        permissions perm;
        perm.set_unrestricted();
        m_t2b_queue.reset(new message_queue(create_only, "ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/, MAX_MESSAGE_NUMBER, 24 /*sizeof(mq_item_t)*/, perm));
        m_b2t_queue.reset(new message_queue(create_only, "ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/, MAX_MESSAGE_NUMBER, 24 /*sizeof(mq_item_t)*/, perm));
    }
    else
    {
        m_t2b_queue.reset(new message_queue(open_only, "ffqs5hbKgFs_t2b"/*m_t2b_queue_name*/));
        m_b2t_queue.reset(new message_queue(open_only, "ffqs5hbKgFs_b2t"/*m_b2t_queue_name*/));
        printf( "t2b max msg size = %d\n", m_t2b_queue->get_max_msg_size() );
        printf( "b2t max msg size = %d\n", m_b2t_queue->get_max_msg_size() );
    }    
}

The variables in the above code have been replaced with hardcoded value so that it is more clear

The parent process create the sink instance like

sink parent( "ffqs5hbKgFs", true);

In this way I suppose the parent process create 2 message_queue internally. one for read / the other is for write.

Then parent process create the child process which create the sink instance like

sink child( "ffqs5hbKgFs", false);

I suppose the child process openes the existing 2 message_queue which were created by parent process. and use one of them to send message to parent process.

The problem is, the message_queue in the child process is opened successfully but its max_msg_size is zero.

t2b max msg size = 0
b2t max msg size = 0

And this causes error when clild process tries to send a message via the message_queue.

Question: Does this mean the message_queue has to be created by the process which writes to it?

In my scene, I expect the message_queue is always created by parent process so that when child-process crashes, a new child-process can be attached to the existing message_queue to resume the job.


Reason: Finally I found the reason, one process is running as x86. the other is running as x64


Solution

  • You need to synchronize access during queue's creation. You're actually synchronizing filesystem access.

    Here you can see the failure being reproduced:

    Compiling On Coliru

    int main() {
        std::cout << "Before: " << getpid() << "\n";
    
        if (int child = fork()) {
            std::cout << "Parent: " << getpid() << "\n";
            sink parent("ffqs5hbKgFs", true);
    
            int status;
            waitpid(child, &status, 0);
        } else {
            std::cout << "Child: " << getpid() << "\n";
            sink parent("ffqs5hbKgFs", false);
        }
    }
    

    Prints

    terminate called after throwing an instance of 'boost::interprocess::interprocess_exception'
    what():  No such file or directory
    

    Of course, a simple sleep would prove that it's a race condition (what do you expect, the child will be opening the queue(s) while the parent is just remove-ing them):

    } else {
        std::cout << "Child: " << getpid() << "\n";
        sleep(1); // one second
        sink parent("ffqs5hbKgFs", false);
    }
    

    Prints, e.g.:

    Before: 3318
    Child: 3319
    t2b max msg size = 24
    b2t max msg size = 24
    Parent: 3318
    

    Further reading

    For proper synchronization solutions see the docs:

    As mentioned before, the ability to shared memory between processes through memory mapped files or shared memory objects is not very useful if the access to that memory can't be effectively synchronized. This is the same problem that happens with thread-synchronization mechanisms, where heap memory and global variables are shared between threads, but the access to these resources needs to be synchronized typically through mutex and condition variables. Boost.Threads implements these synchronization utilities between threads inside the same process. Boost.Interprocess implements similar mechanisms to synchronize threads from different processes

    Full demo

    Compiling On Coliru

    #include <boost/shared_ptr.hpp>
    #include <boost/make_shared.hpp>
    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <iostream>
    #include <mqueue.h>
    
    namespace bip = boost::interprocess;
    
    typedef boost::shared_ptr<boost::interprocess::message_queue> mq_ptr;
    
    struct mq_item_t {
        char data[24];
    };
    
    struct sink {
        std::string m_t2b_queue_name;
        std::string m_b2t_queue_name;
        mq_ptr m_t2b_queue;
        mq_ptr m_b2t_queue;
    
        sink(const char * conn_id, bool is_parent)
            : 
            m_t2b_queue_name(conn_id + std::string("_t2b")),
            m_b2t_queue_name(conn_id + std::string("_b2t"))
        {
            const int MAX_MESSAGE_NUMBER = 100000;
    
            if( is_parent )
            {
                bip::message_queue::remove(m_t2b_queue_name.c_str());
                bip::message_queue::remove(m_b2t_queue_name.c_str());
    
                bip::permissions perm;
                perm.set_unrestricted();
                m_t2b_queue.reset(new bip::message_queue(bip::create_only, m_t2b_queue_name.c_str(), MAX_MESSAGE_NUMBER, sizeof(mq_item_t), perm));
                m_b2t_queue.reset(new bip::message_queue(bip::create_only, m_b2t_queue_name.c_str(), MAX_MESSAGE_NUMBER, sizeof(mq_item_t), perm));
            }
            else
            {
                m_t2b_queue.reset(new bip::message_queue(bip::open_only, m_t2b_queue_name.c_str()));
                m_b2t_queue.reset(new bip::message_queue(bip::open_only, m_b2t_queue_name.c_str()));
                std::cout << "t2b max msg size = " << m_t2b_queue->get_max_msg_size() << "\n";
                std::cout << "b2t max msg size = " << m_b2t_queue->get_max_msg_size() << "\n";
            }    
        }
    };
    
    #include <sys/types.h>
    #include <sys/wait.h>
    
    int main() {
        std::cout << "Before: " << getpid() << "\n";
    
        if (int child = fork()) {
            std::cout << "Parent: " << getpid() << "\n";
            sink parent("ffqs5hbKgFs", true);
    
            int status;
            waitpid(child, &status, 0);
        } else {
            std::cout << "Child: " << getpid() << "\n";
            sleep(1); // one second
            sink parent("ffqs5hbKgFs", false);
        }
    }