Search code examples
c++winapiwaitformultipleobjectsanonymous-pipes

Using WaitForMultipleObjects on anonymous Pipe ( msdn )


I have an issue in using WaitForMultipleObjects on Anonymous Pipe. My goal is to Wait on a pipe to be written and another object at the same time. To be more precise, I am using pipes for local IPC. I saw on stackoverflow an approach where you can create a handle using CreateEvent and set this event whenever you have a WriteFile operation on the pipe and reset it after a ReadFile but here is a simple unit test that proofs that this approach is not correct.

#include <windows.h>
#include<iostream>

#pragma comment(lib,"ws2_32.lib") //Winsock Library

HANDLE event_on_pipe;
HANDLE endpoint_pipe[2];

DWORD WINAPI WThread_Pipe( LPVOID lpParam )
{

        ULONG buffer = 1234;
        DWORD bytes;
        int write;
        int count = 10;
        while(count --)
        {
            WriteFile(endpoint_pipe[1],(char*)&buffer, sizeof(buffer),&bytes,NULL);
            if ( !SetEvent(event_on_pipe) )
            {
                printf("SetEvent failed (%d)\n", GetLastError());
                return 0;
            }
        }
        return 0;
}



int main()
{
    HANDLE Thread_Pipe;
    DWORD ThreadID_Pipe;
    event_on_pipe = CreateEvent(NULL,FALSE,FALSE,NULL);


    CreatePipe(&endpoint_pipe[0],&endpoint_pipe[1],NULL,0);

    
    HANDLE lphandles[1];

    lphandles[0] = event_on_pipe;

    Thread_Pipe = CreateThread(NULL,0,(LPTHREAD_START_ROUTINE) WThread_Pipe, NULL, 0, &ThreadID_Pipe);


    ULONG buffer;
    DWORD bytes;
    while(1)
    {
        DWORD obj = WaitForMultipleObjects(1,lphandles,FALSE,INFINITE);

        if(obj == WAIT_OBJECT_0) 
        {
            ReadFile(endpoint_pipe[0], &buffer,sizeof(ULONG),&bytes,NULL);
            ResetEvent(event_on_pipe);
            Sleep(1000);
        }


        std::cout << buffer<<std::endl;
    }




    return 0;
}

I am supposed to see on the console 1234 printed 10 times while I am seeing it two times. The main problem is that when I call the function Sleep for one second after ReadFile operation the thread "write the pipe and set the event" 9 times, So I still have 9 ReadFile operation left but since setting the event 9 times is considered as one set then WaitForMultipleObjects is going to be signaled once.

I want an efficient way to signal WaitForMultipleObjects when a pipe is written (for local inter process communication). It doesn't need to be an anonymous pipe, it could be something else that acts like pipes to exchange messages between threads. So if someone knows how it could be done please help.


Solution

  • As you've found, an Event has only two states: signaled and not signaled. It has no capability to keep track of how many times it's been signaled.

    Instead of an event, I'd use a semaphore, something like this:

    #include <windows.h>
    #include <iostream>
    
    HANDLE event_on_pipe;
    HANDLE endpoint_pipe[2];
    ULONG end = -1;
    
    DWORD WINAPI WThread_Pipe(LPVOID lpParam) {
    
        ULONG buffer = 1234;
    
        DWORD bytes;
        for (int count = 0; count < 10; count++) {
            WriteFile(endpoint_pipe[1], (char*)&buffer, sizeof(buffer), &bytes, NULL);
            ReleaseSemaphore(event_on_pipe, 1, NULL);
        }
        // write a unique value to the pipe to signal the end of data:
        WriteFile(endpoint_pipe[1], (char*)&end, sizeof(end), &bytes, NULL);
        ReleaseSemaphore(event_on_pipe, 1, NULL);
        return 0;
    }
    
    int main() {
        HANDLE Thread_Pipe;
        DWORD ThreadID_Pipe;
        event_on_pipe = CreateSemaphore(NULL, 0, 100, NULL);
    
        CreatePipe(&endpoint_pipe[0], &endpoint_pipe[1], NULL, 0);
    
        HANDLE lphandles[1];
    
        lphandles[0] = event_on_pipe;
    
        Thread_Pipe = CreateThread(NULL, 0, (LPTHREAD_START_ROUTINE)WThread_Pipe, NULL, 0, &ThreadID_Pipe);
    
        ULONG buffer;
        DWORD bytes;
        while (WAIT_OBJECT_0 == WaitForMultipleObjects(1, lphandles, FALSE, INFINITE)) {
            ReadFile(endpoint_pipe[0], &buffer, sizeof(buffer), &bytes, NULL);
            if (buffer == end) {
                break;
            }
            std::cout << buffer << std::endl;
            Sleep(1000);
        }
    
        return 0;
    }
    

    I've tried to keep most of the code pretty similar to how you had it, keeping changes to (more or less) a minimum necessary to get it to work.

    If I were starting from scratch, and just wanted to transmit a series of numbers (or other things, all of the same type) between threads in a single process, I'd probably use a thread-safe queue instead.

    If we wanted to use Windows primitives, the queue might look something like this:

    #ifndef QUEUE_H_INCLUDED
    #define QUEUE_H_INCLUDED
    
    #include <windows.h>
    
    template<class T, unsigned max = 256>
    class queue {
        HANDLE space_avail; // at least one slot empty
        HANDLE data_avail;  // at least one slot full
        CRITICAL_SECTION mutex; // protect buffer, in_pos, out_pos
    
        T buffer[max];
        long in_pos, out_pos;
    public:
        queue() : in_pos(0), out_pos(0) {
            space_avail = CreateSemaphore(NULL, max, max, NULL);
            data_avail = CreateSemaphore(NULL, 0, max, NULL);
            InitializeCriticalSection(&mutex);
        }
    
        void push(T data) {
            WaitForSingleObject(space_avail, INFINITE);
            EnterCriticalSection(&mutex);
            buffer[in_pos] = data;
            in_pos = (in_pos + 1) % max;
            LeaveCriticalSection(&mutex);
            ReleaseSemaphore(data_avail, 1, NULL);
        }
    
        bool pop(T &dest, unsigned maxWaitMs = 100) {
            if (WAIT_OBJECT_0 != WaitForSingleObject(data_avail, maxWaitMs)) {
                return false;
            }
            EnterCriticalSection(&mutex);
            dest = buffer[out_pos];
            out_pos = (out_pos + 1) % max;
            LeaveCriticalSection(&mutex);
            ReleaseSemaphore(space_avail, 1, NULL);
            return true;
        }
    
        ~queue() {
            DeleteCriticalSection(&mutex);
            CloseHandle(data_avail);
            CloseHandle(space_avail);
        }
    };
    
    #endif
    

    ...and the code using it would look something like this:

    #include "queue.hpp"
    #include <iostream>
    #include <thread>
    #include <chrono>
    
    void sender(queue<int> &dest) {
        for (int i = 0; i < 10; i++)
            dest.push(i);
    }
    
    int main() {
        using namespace std::literals;
    
        queue<int> q;
    
        auto t = std::thread([&] { sender(q); });
    
        int val;
    
        while (q.pop(val)) {
            std::cout << val << std::endl;
            std::this_thread::sleep_for(1s);
        }
    
        t.join();
    }
    

    You could also write the thread-safe queue using native C++ primitives instead. For one example, Anthony Williams outlined a thread-safe queue using a condition variable some time ago:

    https://www.justsoftwaresolutions.co.uk/threading/implementing-a-thread-safe-queue-using-condition-variables.html

    That changes the interface slightly, but only slightly (and you could modify it to have an interface about like I've shown above, if you preferred).