Search code examples
c++boostboost-interprocess

Is boost interprocess managed_shared_memory thread safe?


I have two processes P1 and P2 implementing IPC using boost::interprocess::managed_shared_memory. Both use the following snippet of code:

namespace ipc = boost::interprocess;

// Open a shared memory segment with given name and size
ipc::managed_shared_memory segment_(ipc::open_or_create, "shared_memory_segment", 1024);

// Immediately proceed to create objects using find_or_construct
auto heartbeat_p1 = segment_.find_or_construct<std::atomic<int64_t>>("heartbeat_p1")(0);
// .. other shared objects

// And then proceed to use them

I think the usage of open_or_create isn't thread-safe here, and does it require an external synchronization?

What might go wrong is either of two cases -

  1. This is not going to introduce any race conditions. Calls inside open() are going to constitute of OS system calls, which are taken care of by OS (similar to open syscall). Leaving this point for completeness of the question.

Both P1 and P2 enter the code for the constructor of segment_. And since open_or_create is itself a logic that can be represented as:

try {
    create();
}
catch {
  if (already_exists_error){
     try {
       open();
      }
  }
  else {
     throw();
  }
}

We might have a race condition if both try to create() at the same time?

  1. If P1 is inside open() but hasn't completed making the data structures inside the shared memory that is still needed for the proper functioning of the managed memory segment. And, meanwhile, now P2 is fired, it directly proceeds to open, and attaches to it, a segment which hasn't been completely created. Won't P2 be left with an undefined state for the shared memory object?

I have seen my setup behaving weirdly in which the objects in boost managed shared memory aren't really "shared" across process. And there is no exception being thrown either.

Reference for try-catch block: Is there a better way to check for the existence of a boost shared memory segment?.

c++ Synchronize shared memory when reading also mentions to use external synchronization for reading from a process.

Is my understanding correct that we need external synchronization in using open_or_create from two different processes? And if yes, how do I properly synchronize so that both can use open_or_create?


Solution

  • So I went through the boost source code, and found following observations:

    TL;DR : NOPE, we do not need any external synchronization here, since boost source ensures that all calls are synchronized either by OS system calls, or by the atomic reads and compare-and-swap writes into the starting address of the shared memory segment used as an enum.

    Longer version: If we sail our way through boost managed_memory_segment source code, all the important bits can be found in this object's constructor:

    managed_open_or_create_impl.hpp:

    // boost::ipc::managed_shared_memory dervies from:
    // Skipping non-important arguments and templates for this answer
    
    class managed_open_or_create_impl
    {
     managed_open_or_create_impl()
     {
      priv_open_or_create();
     }
    
     void priv_open_or_create() 
     {
      bool created = false;
      // more code ...
      else { //DoOpenOrCreate
             created = this->do_create_else_open(dev, id, size, perm);
          }
    
      // on basis of created variable
      if(created){
             this->do_map_after_create(dev, m_mapped_region, size, addr, construct_func);
          }
          else{
             this->do_map_after_open(dev, m_mapped_region, addr, construct_func, ronly, cow);
          }
     }
    
    }
    

    So we have 3 functions of interest here: do_create_else_open(), do_map_after_create(), do_map_after_open().

    Let's go through them one-by-one:

    template <class DeviceId>
       static bool do_create_else_open(DeviceAbstraction &dev, const DeviceId & id, std::size_t size, const permissions &perm)
       {
          spin_wait swait;
          unsigned tries = 0;
          while(1){
             BOOST_TRY{
                create_device<FileBased>(dev, id, size, perm, file_like_t());
                return true;
             }
             BOOST_CATCH(interprocess_exception &ex){
                #ifndef BOOST_NO_EXCEPTIONS
                if(ex.get_error_code() != already_exists_error){
                   BOOST_RETHROW
                }
                else if (++tries == MaxCreateOrOpenTries) {
                   //File existing when trying to create, but non-existing when
                   //trying to open, and tried MaxCreateOrOpenTries times. Something fishy
                   //is happening here and we can't solve it
                   throw interprocess_exception(error_info(corrupted_error));
                }
                else{
                   BOOST_TRY{
                      DeviceAbstraction tmp(open_only, id, read_write);
                      dev.swap(tmp);
                      return false;
                   }
                   BOOST_CATCH(interprocess_exception &e){
                      if(e.get_error_code() != not_found_error){
                         BOOST_RETHROW
                      }
                   }
                   BOOST_CATCH(...){
                      BOOST_RETHROW
                   } BOOST_CATCH_END
                }
                #endif   //#ifndef BOOST_NO_EXCEPTIONS
             }
             BOOST_CATCH(...){
                BOOST_RETHROW
             } BOOST_CATCH_END
             swait.yield();
          }
          return false;
       }
    
    1. Okay, so this is simple, it wraps the try-catch block inside the create_device calls. The create_device expands to shm_open() and is bound to be atomic due to it being an OS system call, similar to file open. So, one process is bound to throw and enters the catch block, where it simply attaches the the OS shared memory already created.

    2. And the one who creates it calls do_map_after_create(). The one who attaches calls do_map_after_open().

    do_map_after_create():

    void do_map_after_create()
    {
          BOOST_TRY{
             //If this throws, we are lost
             truncate_device<FileBased>(dev, static_cast<offset_t>(size), file_like_t());
    
             //If the following throws, we will truncate the file to 1
             mapped_region region(dev, read_write, 0, 0, addr);
             boost::uint32_t *patomic_word = 0;  //avoid gcc warning
             patomic_word = static_cast<boost::uint32_t*>(region.get_address());
             boost::uint32_t previous = atomic_cas32(patomic_word, InitializingSegment, UninitializedSegment);
    
             if(previous == UninitializedSegment){
                BOOST_TRY{
                   construct_func( static_cast<char*>(region.get_address()) + ManagedOpenOrCreateUserOffset
                                  , size - ManagedOpenOrCreateUserOffset, true);
                   //All ok, just move resources to the external mapped region
                   final_region.swap(region);
                }
                BOOST_CATCH(...){
                   atomic_write32(patomic_word, CorruptedSegment);
                   BOOST_RETHROW
                } BOOST_CATCH_END
                atomic_write32(patomic_word, InitializedSegment);
             }
             else{
                atomic_write32(patomic_word, CorruptedSegment);
                throw interprocess_exception(error_info(corrupted_error));
             }
          }
          BOOST_CATCH(...){
             BOOST_TRY{
                truncate_device<FileBased>(dev, 1u, file_like_t());
             }
             BOOST_CATCH(...){
             }
             BOOST_CATCH_END
             BOOST_RETHROW
          }
          BOOST_CATCH_END
       }
    
    1. Firstly, it truncates the shared memory to the specified size. Takes the initial memory pointer to the shared memory segment, and performs an atomic compare-and-swap to write an enum of value InitializingSegment. Note, just after truncating the value is set to 0 by default by OS, so the enum value is UninitializedSegment.
    2. After writing the enum successfully to InitializingSegment, it constructs objects needed for the functioning of the segment. Completing that, it updates the enum to InitializedSegment atomicaly.

    Now, do_map_after_open(), broken into 2 parts: Part-1:

    void do_map_after_open()
    {
          const usduration TimeoutSec(usduration_seconds(MaxInitializeTimeSec));
    
          if(FileBased){
             offset_t filesize = 0;
             spin_wait swait;
    
             //If a file device was used, the creator might be truncating the device, so wait
             //until the file size is enough to map the initial word
             ustime ustime_start = microsec_clock<ustime>::universal_time();
    
             while(1){
                if(!get_file_size(file_handle_from_mapping_handle(dev.get_mapping_handle()), filesize)){
                   error_info err = system_error_code();
                   throw interprocess_exception(err);
                }
                if (filesize != 0)
                   break;
                else {
                   //More than MaxZeroTruncateTimeSec seconds waiting to the creator
                   //to minimally increase the size of the file: something bad has happened
                   const usduration elapsed(microsec_clock<ustime>::universal_time() - ustime_start);
                   if (elapsed > TimeoutSec){
                      throw interprocess_exception(error_info(corrupted_error));
                   }
                   swait.yield();
                }
             }
             //The creator detected an error creating the file and signalled it with size 1
             if(filesize == 1){
                throw interprocess_exception(error_info(corrupted_error));
             }
          }
          // .. more code later
    }
    

    In part-1:

    1. It checks if the file opened in shm_open has been truncated yet or not by matching the size of the file. If truncated it proceeds further. It checks it only for a number of retries not exceeding TimeoutSec = MaxInitializeTimeSec (300u).

    Part-2

    {
          // continued;
          mapped_region  region(dev, ronly ? read_only : (cow ? copy_on_write : read_write), 0, 0, addr);
    
          boost::uint32_t *patomic_word = static_cast<boost::uint32_t*>(region.get_address());
          boost::uint32_t value = atomic_read32(patomic_word);
    
          if (value != InitializedSegment){
             ustime ustime_start = microsec_clock<ustime>::universal_time();
             spin_wait swait;
             while ((value = atomic_read32(patomic_word)) != InitializedSegment){
                if(value == CorruptedSegment){
                   throw interprocess_exception(error_info(corrupted_error));
                }
                //More than MaxZeroTruncateTimeSec seconds waiting to the creator
                //to minimally increase the size of the file: something bad has happened
                const usduration elapsed(microsec_clock<ustime>::universal_time() - ustime_start);
                if (elapsed > TimeoutSec){
                   throw interprocess_exception(error_info(corrupted_error));
                }
                swait.yield();
             }
             //The size of the file might have grown while Uninitialized -> Initializing, so remap
             {
                mapped_region null_map;
                region.swap(null_map);
             }
             mapped_region  final_size_map(dev, ronly ? read_only : (cow ? copy_on_write : read_write), 0, 0, addr);
             final_size_map.swap(region);
          }
          construct_func( static_cast<char*>(region.get_address()) + ManagedOpenOrCreateUserOffset
                         , region.get_size() - ManagedOpenOrCreateUserOffset
                         , false);
          //All ok, just move resources to the external mapped region
          final_region.swap(region);
    }
    
    1. Now, this part reads the starting address of the shared memory atomically and waits until the enum value is set to InitializedSegment. It also throws if it can't open it after TimeoutSec.
    2. Completing the wait successfully, it makes it own object of mapped_region object.

    Thus, all calls are synchronized either by OS system calls, or by the atomic reads and compare-and-swap enum writes into the starting address of the shared memory segment.

    Edit: I finally found the issue for which Processes were not able to attach themselves to the same shared memory segment. It was due to systemd deleting the shared memory resources for a user account on logouts. Link: https://superuser.com/a/1179962/1818191