Search code examples
c++multithreadingpipefile-descriptorandroid-looper

Creating a dispatch queue / thread handler in C++ with pipes: FIFOs overfilling


Threads are resource-heavy to create and use, so often a pool of threads will be reused for asynchronous tasks. A task is packaged up, and then "posted" to a broker that will enqueue the task on the next available thread.

This is the idea behind dispatch queues (i.e. Apple's Grand Central Dispatch), and thread handlers (Android's Looper mechanism).

Right now, I'm trying to roll my own. In fact, I'm plugging a gap in Android whereby there is an API for posting tasks in Java, but not in the native NDK. However, I'm keeping this question platform independent where I can.

Pipes are the ideal choice for my scenario. I can easily poll the file descriptor of the read-end of a pipe(2) on my worker thread, and enqueue tasks from any other thread by writing to the write-end. Here's what that looks like:

int taskRead, taskWrite;

void setup() {
    // Create the pipe
    int taskPipe[2];
    ::pipe(taskPipe);
    taskRead = taskPipe[0];
    taskWrite = taskPipe[1];

    // Set up a routine that is called when task_r reports new data
    function_that_polls_file_descriptor(taskRead, []() {
        // Read the callback data
        std::function<void(void)>* taskPtr;
        ::read(taskRead, &taskPtr, sizeof(taskPtr));

        // Run the task - this is unsafe! See below.
        (*taskPtr)();

        // Clean up
        delete taskPtr;
    });
}

void post(const std::function<void(void)>& task) {
    // Copy the function onto the heap
    auto* taskPtr = new std::function<void(void)>(task);

    // Write the pointer to the pipe - this may block if the FIFO is full!
    ::write(taskWrite, &taskPtr, sizeof(taskPtr));
}

This code puts a std::function on the heap, and passes the pointer to the pipe. The function_that_polls_file_descriptor then calls the provided expression to read the pipe and execute the function. Note that there are no safety checks in this example.

This works great 99% of the time, but there is one major drawback. Pipes have a limited size, and if the pipe is filled, then calls to post() will hang. This in itself is not unsafe, until a call to post() is made within a task.

auto evil = []() {
    // Post a new task back onto the queue
    post({});
    // Not enough new tasks, let's make more!
    for (int i = 0; i < 3; i++) {
        post({});
    }

    // Now for each time this task is posted, 4 more tasks will be added to the queue.
});

post(evil);
post(evil);
...

If this happens, then the worker thread will be blocked, waiting to write to the pipe. But the pipe's FIFO is full, and the worker thread is not reading anything from it, so the entire system is in deadlock.

What can be done to ensure that calls to post() eminating from the worker thread always succeed, allowing the worker to continue processing the queue in the event it is full?


Solution

  • Thanks to all the comments and other answers in this post, I now have a working solution to this problem.

    The trick I've employed is to prioritise worker threads by checking which thread is calling post(). Here is the rough algorithm:

    pipe ← NON-BLOCKING-PIPE()
    overflow ← Ø
    POST(task)
        success ← WRITE(task, pipe)
        IF NOT success THEN
            IF THREAD-IS-WORKER() THEN
                overflow ← overflow ∪ {task}
            ELSE
                WAIT(pipe)
                POST(task)
    

    Then on the worker thread:

    LOOP FOREVER
        task ← READ(pipe)
        RUN(task)
    
        FOR EACH overtask ∈ overflow
            RUN(overtask)
    
        overflow ← Ø
    

    The wait is performed with pselect(2), adapted from the answer by @Sigismondo.

    Here's the algorithm implemented in my original code example that will work for a single worker thread (although I haven't tested it after copy-paste). It can be extended to work for a thread pool by having a separate overflow queue for each thread.

    int taskRead, taskWrite;
    
    // These variables are only allowed to be modified by the worker thread
    std::__thread_id workerId;
    std::queue<std::function<void(void)>> overflow;
    bool overflowInUse;
    
    void setup() {
        int taskPipe[2];
        ::pipe(taskPipe);
        taskRead = taskPipe[0];
        taskWrite = taskPipe[1];
    
        // Make the pipe non-blocking to check pipe overflows manually
        ::fcntl(taskWrite, F_SETFL, ::fcntl(taskWrite, F_GETFL, 0) | O_NONBLOCK);
    
        // Save the ID of this worker thread to compare later
        workerId = std::this_thread::get_id();
        overflowInUse = false;
    
        function_that_polls_file_descriptor(taskRead, []() {
            // Read the callback data
            std::function<void(void)>* taskPtr;
            ::read(taskRead, &taskPtr, sizeof(taskPtr));
    
            // Run the task
            (*taskPtr)();
            delete taskPtr;
    
            // Run any tasks that were posted to the overflow
            while (!overflow.empty()) {
                taskPtr = overflow.front();
                overflow.pop();
    
                (*taskPtr)();
                delete taskPtr;
            }
    
            // Release the overflow mechanism if applicable
            overflowInUse = false;
        });
    }
    
    bool write(std::function<void(void)>* taskPtr, bool blocking = true) {
        ssize_t rc = ::write(taskWrite, &taskPtr, sizeof(taskPtr));
    
        // Failure handling
        if (rc < 0) {
            // If blocking is allowed, wait for pipe to become available
            int err = errno;
            if ((errno == EAGAIN || errno == EWOULDBLOCK) && blocking) {
                fd_set fds;
                FD_ZERO(&fds);
                FD_SET(taskWrite, &fds);
    
                ::pselect(1, nullptr, &fds, nullptr, nullptr, nullptr);
    
                // Try again
                return write(tdata);
            }
    
            // Otherwise return false
            return false;
        }
    
        return true;
    }
    
    void post(const std::function<void(void)>& task) {
        auto* taskPtr = new std::function<void(void)>(task);
    
        if (std::this_thread::get_id() == workerId) {
            // The worker thread gets 1st-class treatment.
            // It won't be blocked if the pipe is full, instead
            // using an overflow queue until the overflow has been cleared.
            if (!overflowInUse) {
                bool success = write(taskPtr, false);
                if (!success) {
                    overflow.push(taskPtr);
                    overflowInUse = true;
                }
            } else {
                overflow.push(taskPtr);
            }
        } else {
            write(taskPtr);
        }
    }