Search code examples
c++windowssocketsnetworkingio-completion-ports

IO Completion port returning NULL Completion Key


First time using IO Completion Ports. I'm having an issue where the GetQueuedCompletionStatus returns a Null for the Completion Key, which I am using to pass a data struct with handles for other portions of the code.

The GetQueuedCompletionStatus seems to be triggering off messages received just fine otherwise.

I tried to include just the code involving the IO Completion ports:

The Data Structs:

    typedef struct _THREAD_MESSAGE
{
    mutex cmd_mtx;
    string command;
} THREAD_MESSAGE, * LPTHREAD_MESSAGE;

typedef struct _LISTEN_SOCKET_DATA
{
    SOCKET Socket;
    int    Port;
    HANDLE hAcceptEvent;
    HANDLE IOCP;
    VOID* MessageProcessor;
    ConfigHandler* CfgHandle;
    // Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA, * LPLISTEN_SOCKET_DATA;

typedef struct _CONNECTED_SOCKET_DATA
{
    SOCKET Socket;
    int Port;
    HANDLE IOCP;
    VOID* MessageProcessor;
    ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA, * LPCONNECTED_SOCKET_DATA;

#define OPERATION_TYPE_UNKNOWN      0
#define OPERATION_TYPE_SEND         1
#define OPERATION_TYPE_RECV         2
typedef struct
{
    OVERLAPPED* Overlapped;
    CHAR Buffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
} PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;

The completion port was initialized with:

m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);

The Listener:

//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
    LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
    WSANETWORKEVENTS NetworkEvents;
    DWORD dwRet;
    SOCKADDR_IN NewSockAddr;
    SOCKET      NewSocket;
    int         nLen;

    while (true) //run forever
    {
        //Wait for event
        dwRet = WSAWaitForMultipleEvents(1,
            &(pSocketData->hAcceptEvent),
            false,
            100,
            false);

        //Nothing happened, back to top
        if (dwRet == WSA_WAIT_TIMEOUT)
            continue;

        //We got a event, find out which one.
        int nRet = WSAEnumNetworkEvents(pSocketData->Socket,
            pSocketData->hAcceptEvent,
            &NetworkEvents);
        if (nRet == SOCKET_ERROR)
        {
            wprintf(L"WSAEnumNetworkEvents error %ld\n", WSAGetLastError());
            break;
        }

        //We got a Accept event
        if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
        {
            //Check for errors
            if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
            {
                
                // Accept new connection
                nLen = sizeof(SOCKADDR_IN);
                NewSocket = accept(pSocketData->Socket,
                    (LPSOCKADDR)&NewSockAddr,
                    &nLen);
                if (NewSocket == SOCKET_ERROR)
                {
                    wprintf(L"accept() error %ld\n", WSAGetLastError());
                    break;
                }

                wprintf(L"Accepted Connection %ld", NewSockAddr.sin_addr.S_un.S_addr);

                //Set new connection as TCP connection, No Delay
                const char chOpt = 1;
                int nErr = setsockopt(NewSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
                if (nErr == -1)
                {
                    wprintf(L"setsockopt() error %ld\n", WSAGetLastError());
                    break;
                }


                LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;

                ZeroMemory(ConnectedSocketData, sizeof(CONNECTED_SOCKET_DATA));

                ConnectedSocketData->Socket = NewSocket;
                ConnectedSocketData->Port = pSocketData->Port;
                ConnectedSocketData->IOCP = pSocketData->IOCP;
                ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                ConnectedSocketData->MessageProcessor = pSocketData->MessageProcessor;

                //Add the new socket to the completion port, message from the socker will be queued up for proccessing by worker threads.
                if (CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD)ConnectedSocketData, 0) == NULL)
                {
                    wprintf(L"CreateIOCompletionPort error %ld\n", WSAGetLastError());
                    delete ConnectedSocketData;
                    ConnectedSocketData = NULL;
                    closesocket(NewSocket);
                    break;
                }                

                //Set the PerIOData, will be used at completion time
                LPPER_IO_OPERATION_DATA PerIOOperationData = new PER_IO_OPERATION_DATA;
                PerIOOperationData->BufferLen = 0;
                PerIOOperationData->OperationType = OPERATION_TYPE_RECV;
                DWORD RecvBytes = 0;
                DWORD Flags = 0;
                WSABUF DataBuf;
                DataBuf.len = DATA_BUFSIZE;
                DataBuf.buf = PerIOOperationData->Buffer;
                PerIOOperationData->Overlapped = new OVERLAPPED;
                ZeroMemory(PerIOOperationData->Overlapped, sizeof(OVERLAPPED));

                //Kick off the first Recv request for the Socket, will be handled by the completion Queue.
                if (WSARecv(NewSocket, &DataBuf, 1, &RecvBytes, &Flags, (PerIOOperationData->Overlapped), NULL) == SOCKET_ERROR)
                {
                    wprintf(L"WSARecv error %ld\n", WSAGetLastError());
                    return 0;
                }
            }
            else
            {
                wprintf(L"Unknown network event error %ld\n", WSAGetLastError());
                break;
            }
        }
    }
}

The Worker Thread, it crashes when it tries to use ConnectedSocketData due to the struct being null:

    // Worker thread, processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred = 0;
    OVERLAPPED* lpOverlapped = NULL;
    LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    DWORD Flags = 0;
    WSABUF DataBuf;
    DWORD RecvBytes = 0;
    int DestinationAddress = 0;

    while (TRUE)//run forever
    {
        //Check for new message
        if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&ConnectedSocketData, (LPOVERLAPPED*)&lpOverlapped, INFINITE) == 0)
        {
            DWORD Err = GetLastError();
            if (Err != WAIT_TIMEOUT)
            {
                printf("GetQueuedCompletionStatus() failed with error %d\n", Err);

                if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                {
                    printf("closesocket() failed with error %d\n", WSAGetLastError());
                    return 0;
                }

                GlobalFree(ConnectedSocketData);
            }
            continue;
        }

        // retrieve IO data
        PerIoData = CONTAINING_RECORD(lpOverlapped, PER_IO_OPERATION_DATA, Overlapped);

        vector<SiteData>::iterator SiteDataIterator;
        vector<InstrumentData>::iterator InstrumentDataIterator;

        for (SiteDataIterator = ConnectedSocketData->CfgHandle->SiteConnections.begin();
            SiteDataIterator != ConnectedSocketData->CfgHandle->SiteConnections.end();
            SiteDataIterator++)
        {
            if (SiteDataIterator->Port == ConnectedSocketData->Port)
            {
                break;
            }
        }

Any Ideas why the IOCP is not passing the Completion Key?


Solution

  • Any Ideas why the IOCP is not passing the Completion Key?

    of course it passing back exactly what you pass to CreateIoCompletionPort and I/O in place pointer to OVERLAPPED

    but at first

    CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD)ConnectedSocketData, 0)
    

    is wrong - only low 32 bit of ConnectedSocketData is used here, must be

    CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD_PTR)ConnectedSocketData, 0)
    

    then, your definition of PER_IO_OPERATION_DATA

    typedef struct
    {
        OVERLAPPED* Overlapped;
        CHAR Buffer[DATA_BUFSIZE];
        int BufferLen;
        int OperationType;
        string PacketName;
    } PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
    

    is always and critical error. must be

    typedef struct
    {
        OVERLAPPED Overlapped;
        CHAR Buffer[DATA_BUFSIZE];
        int BufferLen;
        int OperationType;
        string PacketName;
    } PER_IO_OPERATION_DATA, * LPPER_IO_OPERATION_DATA;
    

    by Overlapped (when it defined as OVERLAPPED* Overlapped ) impossible take back address of PER_IO_OPERATION_DATA. but from &Overlapped (when it defined as OVERLAPPED Overlapped ) already possible.

    i be better do next definition

    struct PER_IO_OPERATION_DATA : public OVERLAPPED
    {
        CHAR Buffer[DATA_BUFSIZE];
        int BufferLen;
        int OperationType;
        string PacketName;
    };
    

    and use

    WSARecv(NewSocket, &DataBuf, 1, &RecvBytes, &Flags, PerIOOperationData, NULL)
    

    and

    PerIoData = static_cast<PER_IO_OPERATION_DATA*>(lpOverlapped);
    

    you wrong handle case when GetQueuedCompletionStatus return false - you leak PerIoData it this case.

    this is only top errors in your code, related to your direct question