Search code examples
c++mutexcondition-variableunique-lock

Trying to write asynchronous I/O in C++ using locks and condition variables. This code calls terminate on the first lock() why?


We were trying to create C++ code that would read a block from a file, and start a thread to asynchronously read the next block while processing the first block. We started with condition_variable, but it was crashing so we went with straight locks. The program dies on the first readLock.lock() Ok, as some comments explained, the following code is wrong because we are not using RAII. Corrected code follows after, with delays put in for each function to force bugs. It still crashes though not on the first lock.

#include <fcntl.h>
#include <unistd.h>
#include <condition_variable>
#include <iostream>
#include <thread>
using namespace std;
using namespace std::chrono_literals;
constexpr uint32_t blockSize = 32768;
char* buf1;
char* buf2;
char* processbuf = buf1;
char* readbuf = buf2;

mutex readMutex;
mutex procMutex;
//condition_variable readCV;
//condition_variable procCV;
int fh;
int bytesRead;

std::unique_lock<std::mutex> readLock;
std::unique_lock<std::mutex> procLock;

void nextRead() {
  while (true) {
    readLock.lock();
    bytesRead = read(fh, readbuf, blockSize);
    for (int i = 0; i < 5; i++) {
      cerr << "reading..." << endl;
      usleep(100000);
      readLock.unlock();
    }
    cerr << "notifying..." << endl;
    readLock.unlock();
    if (bytesRead != blockSize)  // last time, end here!
      return;
      procLock.lock();
        procLock.unlock();
  }
}

void process(char* buf, uint32_t bytesRead) { cout << "hit it!" << endl; }

int thready() {
  buf1 = new char[blockSize];
  buf2 = new char[blockSize];
  fh = open("bigfile.dat", O_RDONLY);
  if (fh < 0) {
        throw std::runtime_error("cannot open file!");
  }
  int currentBytesRead = read(fh, buf1, blockSize);
  thread reader(nextRead);
  process(processbuf, currentBytesRead);
  while (true) {
    readLock.lock();
    cout << "back from wait" << endl;
    swap(processbuf, readbuf);  // switch to other buffer for next time
    currentBytesRead =
        bytesRead;  // copy locally so thread can do the other one
    // TODO: is the above a problem? what if
    readLock.unlock();
        procLock.lock();
    process(processbuf, currentBytesRead);
    procLock.unlock();
  }
  reader.join();

  delete[] buf1;
  delete[] buf2;
}
int main() {
  try {
    thready();
  }catch(std::exception& e) {
    cerr << e.what() << '\n';
  }
  return 0;
}

Here is the corrected code using RAII which still does not work. Now it terminates on the 3rd read? The file is 1Mb of zeros. I am considering changing block size to 1 byte and having the file be "123456789" for purposes of easy testing :

#include <fcntl.h>
#include <unistd.h>
#include <condition_variable>
#include <iostream>
#include <thread>
using namespace std;
using namespace std::chrono_literals;

constexpr uint32_t blockSize = 32768;
char* buf1;
char* buf2;
char* processbuf = buf1;
char* readbuf = buf2;

mutex readMutex;
mutex procMutex;
//condition_variable readCV;
//condition_variable procCV;
int fh;
int bytesRead;

void nextRead() {
    while (true) {
        {
    unique_lock<mutex> readLock(readMutex);
            bytesRead = read(fh, readbuf, blockSize);
            for (int i = 0; i < 5; i++) {
                cerr << "reading..." << endl;
                usleep(100000);
                readLock.unlock();
            }
            cerr << "notifying..." << endl;
        }
        if (bytesRead != blockSize)  // last time, end here!
            return;
        // wait for process to complete
        unique_lock<mutex> procLock(procMutex);
    }
}

void process(char* buf, uint32_t bytesRead) {
    cout << "processing..." << endl;
    usleep(100000);
}
int thready() {
    buf1 = new char[blockSize];
    buf2 = new char[blockSize];
    fh = open("bigfile.dat", O_RDONLY);
    if (fh < 0) {
        throw std::runtime_error("cannot open file!");
    }
    int currentBytesRead = read(fh, buf1, blockSize);
    thread reader(nextRead);
    process(processbuf, currentBytesRead);
    while (true) {
        {
      unique_lock<mutex> readLock(readMutex);
          cout << "back from wait" << endl;
          swap(processbuf, readbuf);  // switch to other buffer for next time
          currentBytesRead = bytesRead;  // create local copy
            // TODO: is the above a problem? what if
        }
      unique_lock<mutex> procLock(procMutex);
        process(processbuf, currentBytesRead);
    }
    reader.join();

    delete[] buf1;
    delete[] buf2;
}
int main() {
    try {
        thready();
    } catch(std::exception& e) {
        cerr << e.what() << '\n';
    }
    return 0;
}

The output is:

processing...
reading...
reading...back from wait
processing...

back from wait
processing...
terminate called after throwing an instance of 'std::system_error'
  what():  Operation not permitted
Aborted (core dumped)

Solution

  • Your mutexes are shared between threads as globals. That's fine. But your locks need to be local variables owned by a single thread. Use the fact that unique_lock auto locks the mutex in its constructor and auto-releases it its destructor. Here's a modificaton of your code:

    void nextRead() {
      while (true) {
    
        {
            // acquire the read lock  
            std::unique_lock<std::mutex> readLock(readMutex);
    
            bytesRead = read(fh, readbuf, blockSize);
            for (int i = 0; i < 5; i++) {
                cerr << "reading..." << endl;
                usleep(100000);
            }
    
            // when readLock goes out of scope, the mutex associated with it is unlocked
        }
    
        cerr << "notifying..." << endl;
    
        {
            // acquire the process lock
            std::unique_lock<std::mutex> procLock(procMutex); 
    
            if (bytesRead != blockSize)  // last time, end here!
                return;
    
            // procMutex gets unlocked inmplicitly as procLock goes out of scope
        }
    }
    

    You'll need to make similar changes in your thready function.