Search code examples
c++boostsignalsmessage-queue

How can I interrupt boost message queue send & receive that are being blocked by signals?


I have a process which is using boost message queue. When it is being blocked in either send or receive due to queue size limit has been reached, if I send a signal, it seemed the function call remained blocking. I expected the call to cancel or raise an exception but it didn't behave that way. How can I interrupt the send or receive function call ?

#include <boost/interprocess/ipc/message_queue.hpp>
#include <signal.h>
#include <iostream>
#include <vector>

using namespace boost::interprocess;

static sig_atomic_t do_exit = 0;

void sig_handler(int sig)
{
  printf("signal %d", sig);
  do_exit = 1;
}

int main ()
{
  signal(SIGINT, sig_handler);

  try{
    //Erase previous message queue
    message_queue::remove("message_queue");

    //Create a message_queue.
    message_queue mq
        (create_only               //only create
            ,"message_queue"           //name
            ,5                       //max message number
            ,sizeof(int)               //max message size
        );

    //Send 100 numbers
    for(int i = 0; i < 100 && !do_exit; ++i){
      mq.send(&i, sizeof(i), 0);
      printf("%i\n", i);
    }
    printf("finished\n");
  }
  catch(interprocess_exception &ex){
    std::cout << ex.what() << std::endl;
    return 1;
  }
  catch(...) {
    std:: cout << "Exception" << std::endl;
  }

  return 0;
}

Solution

  • The way is to use the timed interfaces:

    for (int i = 0; i < 100 && !do_exit; ++i) {
        while (!do_exit) {
            if (mq.timed_send(&i, sizeof(i), 0, now() + 10ms)) {
                printf("%i\n", i);
                break;
            }
        }
    
        sleep_for(50ms);
    }
    

    E.g.:

    #include <boost/interprocess/ipc/message_queue.hpp>
    #include <iostream>
    #include <signal.h>
    #include <vector>
    #include <chrono>
    #include <thread>
    
    using namespace std::chrono_literals;
    namespace bip = boost::interprocess;
    
    static sig_atomic_t do_exit = 0;
    
    void sig_handler(int sig)
    {
        printf("signal %d\n", sig);
        do_exit = 1;
    }
    
    int main()
    {
        auto now = std::chrono::steady_clock::now;
        using std::this_thread::sleep_for;
        signal(SIGINT, sig_handler);
    
        try {
            bip::message_queue::remove("message_queue");
    
            bip::message_queue mq(bip::create_only, // only create
                                  "message_queue",  // name
                                  5,                // max message number
                                  sizeof(int)       // max message size
            );
    
            // Send 100 numbers
            for (int i = 0; i < 100 && !do_exit; ++i) {
                while (!do_exit) {
                    if (mq.timed_send(&i, sizeof(i), 0, now() + 10ms)) {
                        printf("%i\n", i);
                        break;
                    }
                }
    
                sleep_for(50ms);
            }
            printf("finished\n");
        } catch (bip::interprocess_exception const& ex) {
            std::cout << ex.what() << std::endl;
            return 1;
        } catch (...) {
            std::cout << "Exception" << std::endl;
            return 2;
        }
    }
    

    Demo

    enter image description here