Search code examples
c++asynchronouswaitcondition-variable

How to properly wait for condition variable in C++?


In trying to create an asynchronous I/O file reader in C++ under Linux. The example I have has two buffers. The first read blocks. Then, for each time around the main loop, I asynchronously launch the IO and call process() which runs the simulated processing of the current block. When processing is done, we wait for the condition variable. The idea is that the asynchronous handler should notify the condition variable.

Unfortunately the notify seems to happen before wait, and it seems like this is not the way the condition variable wait() function works. How should I rewrite the code so that the loop waits until the asynchronous io has completed?

#include <aio.h>
#include <fcntl.h>
#include <signal.h>
#include <unistd.h>

#include <condition_variable>
#include <cstring>
#include <iostream>
#include <thread>

using namespace std;
using namespace std::chrono_literals;

constexpr uint32_t blockSize = 512;

mutex readMutex;

condition_variable cv;

int fh;
int bytesRead;

void process(char* buf, uint32_t bytesRead) {
  cout << "processing..." << endl;
  usleep(100000);
}

void aio_completion_handler(sigval_t sigval) {
  struct aiocb* req = (struct aiocb*)sigval.sival_ptr;

  // check whether asynch operation is complete
  if (aio_error(req) == 0) {
    int ret = aio_return(req);
        bytesRead = req->aio_nbytes;
    cout << "ret == " << ret << endl;
    cout << (char*)req->aio_buf << endl;
  }
    {
        unique_lock<mutex> readLock(readMutex);
        cv.notify_one();
    }
}

void thready() {
  char* buf1 = new char[blockSize];
  char* buf2 = new char[blockSize];
  aiocb cb;
  char* processbuf = buf1;
  char* readbuf = buf2;
  fh = open("smallfile.dat", O_RDONLY);
  if (fh < 0) {
    throw std::runtime_error("cannot open file!");
  }

  memset(&cb, 0, sizeof(aiocb));
  cb.aio_fildes = fh;
  cb.aio_nbytes = blockSize;
  cb.aio_offset = 0;

  // Fill in callback information
  /*
  Using SIGEV_THREAD to request a thread callback function as a notification
  method
  */
  cb.aio_sigevent.sigev_notify_attributes = nullptr;
  cb.aio_sigevent.sigev_notify = SIGEV_THREAD;
  cb.aio_sigevent.sigev_notify_function = aio_completion_handler;
  /*
  The context to be transmitted is loaded into the handler (in this case, a
  reference to the aiocb request itself). In this handler, we simply refer to
  the arrived sigval pointer and use the AIO function to verify that the request
  has been completed.
  */
  cb.aio_sigevent.sigev_value.sival_ptr = &cb;

  int currentBytesRead = read(fh, buf1, blockSize);  // read the 1st block

  while (true) {
    cb.aio_buf = readbuf;
    aio_read(&cb);  // each next block is read asynchronously
    process(processbuf, currentBytesRead);  // process while waiting
        {
            unique_lock<mutex> readLock(readMutex);
            cv.wait(readLock);
        }
        currentBytesRead = bytesRead; // make local copy of global modified by the asynch code
    if (currentBytesRead < blockSize) {
      break;  // last time, get out
    }
    cout << "back from wait" << endl;
    swap(processbuf, readbuf);     // switch to other buffer for next time
    currentBytesRead = bytesRead;  // create local copy
  }

  delete[] buf1;
  delete[] buf2;
}

int main() {
  try {
    thready();
  } catch (std::exception& e) {
    cerr << e.what() << '\n';
  }
  return 0;
}

Solution

  • A condition variable should generally be used for

    • waiting until it is possible that the predicate (for example a shared variable) has changed, and
    • notifying waiting threads that the predicate may have changed, so that waiting threads should check the predicate again.

    However, you seem to be attempting to use the state of the condition variable itself as the predicate. This is not how condition variables are supposed to be used and may lead to race conditions such as those described in your question. Another reason to always check the predicate is that spurious wakeups are possible with condition variables.

    In your case, it would probably be appropriate to create a shared variable

    bool operation_completed = false;
    

    and use that variable as the predicate for the condition variable. Access to that variable should always be controlled by the mutex.

    You can then change the lines

    {
        unique_lock<mutex> readLock(readMutex);
        cv.notify_one();
    }
    

    to

    {
        unique_lock<mutex> readLock(readMutex);
        operation_completed = true;
        cv.notify_one();
    }
    

    and change the lines

    {
        unique_lock<mutex> readLock(readMutex);
        cv.wait(readLock);
    }
    

    to:

    {
        unique_lock<mutex> readLock(readMutex);
        while ( !operation_completed )
            cv.wait(readLock);
    }
    

    Instead of

    while ( !operation_completed )
        cv.wait(readLock);
    

    you can also write

    cv.wait( readLock, []{ return operation_completed; } );
    

    which is equivalent. See the documentation of std::condition_varible::wait for further information.

    Of course, operation_completed should also be set back to false when appropriate, while the mutex is locked.