Search code examples
c++windowssocketsnetwork-programmingio-completion-ports

Recv Buffer for IO Completion port always empty


Ok this is related to a previous post, but it's a different error so I made a new question. Previous Post: IO Completion port returning NULL Completion Key

So getting a receive message on the IO completion port i set up, It triggers GetQueuedCompletionStatus and returns with the Completion Key and the Overlapped data. Both appear to be good and I can see data popluated in their structures. However the Buffer which was passed to WSARecv was not populated with the incoming message. (the BytesTransfered indicates that there were bytes received, but no data in the WSABUF).

Here is the code as it currently stands, looking for help as to why the Buffer isn't being populated.

networkhandlerthread.ccp

#include "NetworkHandlerThread.h"

// Worker thread, processes IOCP messages.
DWORD ServerWorkerThread(LPVOID lpParam)
{
    HANDLE CompletionPort = (HANDLE)lpParam;
    DWORD BytesTransferred = 0;
    OVERLAPPED* lpOverlapped = NULL;
    LPCONNECTED_SOCKET_DATA ConnectedSocketData = NULL;
    LPPER_IO_OPERATION_DATA PerIoData = NULL;
    DWORD Flags = 0;
    WSABUF* DataBuf;
    DWORD RecvBytes = 0;
    Type1MessageParser Type1MsgParser;
    Type2MessageParser Type2MsgParser;
    int DestinationAddress = 0;
    bool IsType1 = false;

    while (TRUE)//run forever
    {
        //Check for new message
        if (GetQueuedCompletionStatus(CompletionPort, &BytesTransferred, (PULONG_PTR)&ConnectedSocketData, (LPOVERLAPPED*)&PerIoData, INFINITE) == 0)
        {
            DWORD Err = GetLastError();
            if (Err != WAIT_TIMEOUT)
            {
                printf("GetQueuedCompletionStatus() failed with error %d\n", Err);

                if (closesocket(ConnectedSocketData->Socket) == SOCKET_ERROR)
                {
                    printf("closesocket() failed with error %d\n", WSAGetLastError());
                    return 0;
                }

                GlobalFree(ConnectedSocketData);
            }
            continue;
        }


        //We have a message, determine if it's something we receaved or something we should send.
        if (PerIoData->OperationType == OPERATION_TYPE_RECV)
        {
            ///tbd process recv
            ConnectedSocketData; //this is comming in good and has data
            PerIoData->Buffer; // this is empty (pointer is good, but no data)
        }
        else if (PerIoData->OperationType == OPERATION_TYPE_SEND)
        {
            ///tbd process send
        }
    }
};


//Thread for handling Listener sockets and Accepting connections
DWORD ListenThread(LPVOID lpParam)
{
    LPLISTEN_SOCKET_DATA pSocketData = (LPLISTEN_SOCKET_DATA)(lpParam);
    WSANETWORKEVENTS NetworkEvents;
    DWORD dwRet;
    SOCKADDR_IN NewSockAddr;
    SOCKET      NewSocket;
    int         nLen;

    while (true) //run forever
    {
        //Wait for event
        dwRet = WSAWaitForMultipleEvents(1,
            &(pSocketData->hAcceptEvent),
            false,
            100,
            false);

        //Nothing happened, back to top
        if (dwRet == WSA_WAIT_TIMEOUT)
            continue;

        //We got a event, find out which one.
        int nRet = WSAEnumNetworkEvents(pSocketData->Socket,
            pSocketData->hAcceptEvent,
            &NetworkEvents);
        if (nRet == SOCKET_ERROR)
        {
            wprintf(L"WSAEnumNetworkEvents error %ld\n", WSAGetLastError());
            break;
        }

        //We got a Accept event
        if (NetworkEvents.lNetworkEvents & FD_ACCEPT)
        {
            //Check for errors
            if (NetworkEvents.iErrorCode[FD_ACCEPT_BIT] == 0)
            {

                // Accept new connection
                nLen = sizeof(SOCKADDR_IN);
                NewSocket = WSAAccept(pSocketData->Socket,
                    (LPSOCKADDR)&NewSockAddr,
                    &nLen, NULL, NULL);
                if (NewSocket == SOCKET_ERROR)
                {
                    wprintf(L"accept() error %ld\n", WSAGetLastError());
                    break;
                }

                wprintf(L"Accepted Connection %ld", NewSockAddr.sin_addr.S_un.S_addr);

                //Set new connection as TCP connection, No Delay
                //const char chOpt = 1;
                //int nErr = setsockopt(NewSocket, IPPROTO_TCP, TCP_NODELAY, &chOpt, sizeof(char));
                //if (nErr == -1)
                //{
                //    wprintf(L"setsockopt() error %ld\n", WSAGetLastError());
                //    break;
                //}


                LPCONNECTED_SOCKET_DATA ConnectedSocketData = new CONNECTED_SOCKET_DATA;

                ZeroMemory(ConnectedSocketData, sizeof(CONNECTED_SOCKET_DATA));

                ConnectedSocketData->Socket = NewSocket;
                ConnectedSocketData->Port = pSocketData->Port;
                ConnectedSocketData->IOCP = pSocketData->IOCP;
                ConnectedSocketData->CfgHandle = pSocketData->CfgHandle;
                ConnectedSocketData->ForwardMessager = pSocketData->ForwardMessager;

                //Add the new socket to the completion port, message from the socker will be queued up for proccessing by worker threads.
                if (CreateIoCompletionPort((HANDLE)NewSocket, pSocketData->IOCP, (DWORD_PTR)ConnectedSocketData, 0) == NULL)
                {
                    wprintf(L"CreateIOCompletionPort error %ld\n", WSAGetLastError());
                    delete ConnectedSocketData;
                    ConnectedSocketData = NULL;
                    closesocket(NewSocket);
                    break;
                }

                //Set the PerIOData, will be used at completion time
                LPPER_IO_OPERATION_DATA PerIoData;
                PerIoData = (LPPER_IO_OPERATION_DATA)GlobalAlloc(GPTR, sizeof(PER_IO_OPERATION_DATA));

                ZeroMemory(&(PerIoData->overlapped), sizeof(OVERLAPPED));
                PerIoData->BufferLen = 0;
                PerIoData->OperationType = OPERATION_TYPE_RECV;
                DWORD RecvBytes = 0;
                DWORD Flags = 0;
                PerIoData->Buffer.buf = PerIoData->cBuffer;
                PerIoData->Buffer.len = DATA_BUFSIZE;


                //Kick off the first Recv request for the Socket, will be handled by the completion Queue.
                if (WSARecv(NewSocket, &(PerIoData->Buffer), 1, &RecvBytes, &Flags, &(PerIoData->overlapped), NULL) == SOCKET_ERROR)
                {
                    wprintf(L"WSARecv error %ld\n", WSAGetLastError());
                    return 0;
                }
            }
            else
            {
                wprintf(L"Unknown network event error %ld\n", WSAGetLastError());
                break;
            }
        }
    }
}


NetworkHandlerThread::NetworkHandlerThread()
{
    m_CompletionPort = 0;
    m_hListenThread = 0;
}

NetworkHandlerThread::~NetworkHandlerThread()
{

}

void NetworkHandlerThread::StartNetworkHandler()
{
    int iResult = 0;
    SYSTEM_INFO SystemInfo;
    unsigned int i = 0;

    //Start WSA
    iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
    if (iResult != NO_ERROR) {
        wprintf(L"WSAStartup() failed with error: %d\n", iResult);
        return;
    }

    //Start Completion Port
    m_CompletionPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
    if (m_CompletionPort != NULL)
    {
        wprintf(L"Completion Port Created\n");
    }

    //Get # of system processors
    GetSystemInfo(&SystemInfo);

    //create Worker Threads for each processor.
    for (i = 0; i < SystemInfo.dwNumberOfProcessors * THREADS_PER_PROCESSOR; i++)
    {
        HANDLE ThreadHandle;

        // Create a server worker thread, and pass the
        // completion port to the thread. 
        ThreadHandle = CreateThread(NULL, 0, ServerWorkerThread, m_CompletionPort, 0, NULL);

        // Close the thread handle
        if (ThreadHandle != NULL)
        {
            CloseHandle(ThreadHandle);
        }
    }
}

void NetworkHandlerThread::AddListenThread(int Port,
    ConfigHandler* pConfigHandle,
    void* ForwardHandle)
{
    SOCKADDR_IN InternetAddr;
    int iResult = 0;
    LPLISTEN_SOCKET_DATA pListenSocketData = new LISTEN_SOCKET_DATA;

    if (pListenSocketData == NULL)
    {
        return;
    }

    //Create the listener Socket
    pListenSocketData->Socket = WSASocket(AF_INET, SOCK_STREAM, 0, NULL, 0, WSA_FLAG_OVERLAPPED);
    if (pListenSocketData->Socket == INVALID_SOCKET)
    {
        wprintf(L"socket function failed with error: %ld\n", WSAGetLastError());
        WSACleanup();
        return;
    }

    // Create a Event to handle Socket Accepts
    pListenSocketData->hAcceptEvent = WSACreateEvent();
    if (pListenSocketData->hAcceptEvent == WSA_INVALID_EVENT)
    {
        wprintf(L"WSACreateEvent() error %ld\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    // Set the Event to Trigger on FD_ACCEPT (this occurs on socket connection attempts)
    int nRet = WSAEventSelect(pListenSocketData->Socket,
        pListenSocketData->hAcceptEvent,
        FD_ACCEPT);
    if (nRet == SOCKET_ERROR)
    {
        wprintf(L"WSAAsyncSelect() error %ld\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        return;
    }

    //Assign the Port Number
    InternetAddr.sin_family = AF_INET;
    InternetAddr.sin_addr.s_addr = htonl(INADDR_ANY);
    InternetAddr.sin_port = htons(Port);
    pListenSocketData->Port = Port;
    pListenSocketData->IOCP = m_CompletionPort;
    pListenSocketData->CfgHandle = pConfigHandle;
    pListenSocketData->ForwardMessager = ForwardHandle;

    //Bind the Socket to the Port
    iResult = ::bind((pListenSocketData->Socket), (sockaddr*)&InternetAddr, sizeof(InternetAddr));
    if (iResult == SOCKET_ERROR) {
        wprintf(L"bind function failed with error %d\n", WSAGetLastError());
        iResult = closesocket(pListenSocketData->Socket);
        if (iResult == SOCKET_ERROR)
            wprintf(L"closesocket function failed with error %d\n", WSAGetLastError());
        WSACleanup();
        return;
    }

    //Listen for incoming connection requests.
    if (listen(pListenSocketData->Socket, SOMAXCONN) == SOCKET_ERROR)
    {
        wprintf(L"listen function failed with error: %d\n", WSAGetLastError());
        closesocket(pListenSocketData->Socket);
        WSACleanup();
        return;
    }

    wprintf(L"Listening on %ld", Port);

    m_hListenThread = (HANDLE)CreateThread(NULL,                // Security
        0,                  // Stack size - use default
        ListenThread,  // Thread fn entry point
        (void*)pListenSocketData, //Listen Socket Data
        0,                  // Init flag
        NULL);  // Thread address
}

NetworkHandlerThread.h

#pragma once
#include <WinSock2.h>
#include <ws2tcpip.h>
#include <stdio.h>
#include "ForwardMessageHandler.h"
#include "ConfigHandler.h"
#include "Type1MessageParser.h"
#include "Type2Message-Parser.h"
#include "ThreadUtilities.h"

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

class NetworkHandlerThread
{
public:
    WSADATA wsaData;
    HANDLE m_CompletionPort;
    HANDLE m_hListenThread;

public:
    NetworkHandlerThread();
    ~NetworkHandlerThread();

    void StartNetworkHandler();

    void AddListenThread(int Port,
        ConfigHandler* pConfigHandle,
        void* ForwardHandle);
};

ThreadUtilities.h

#pragma once
#include <mutex>
#include "ConfigHandler.h"


using namespace std;

#define DATA_BUFSIZE 8192
#define THREADS_PER_PROCESSOR 2

typedef struct _THREAD_MESSAGE
{
    mutex cmd_mtx;
    string command;
} THREAD_MESSAGE, * LPTHREAD_MESSAGE;

typedef struct _LISTEN_SOCKET_DATA
{
    SOCKET Socket;
    int    Port;
    HANDLE hAcceptEvent;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
    // Other information useful to be associated with the handle
} LISTEN_SOCKET_DATA, * LPLISTEN_SOCKET_DATA;

typedef struct _CONNECTED_SOCKET_DATA
{
    SOCKET Socket;
    int Port;
    HANDLE IOCP;
    VOID* ForwardMessager;
    ConfigHandler* CfgHandle;
} CONNECTED_SOCKET_DATA, * LPCONNECTED_SOCKET_DATA;

#define OPERATION_TYPE_UNKNOWN      0
#define OPERATION_TYPE_SEND         1
#define OPERATION_TYPE_RECV         2
typedef struct PER_IO_OPERATION_DATA
{
    OVERLAPPED overlapped;
    WSABUF Buffer;
    char cBuffer[DATA_BUFSIZE];
    int BufferLen;
    int OperationType;
    string PacketName;
};

#define LPPER_IO_OPERATION_DATA PER_IO_OPERATION_DATA

Solution

  • So, dumb mistake after looking at code for to long(caught it right away after a good nights sleep), Recv getting hex data in, first character is 0x00. When looking at it in debugger shows up as empty text string, when examined further, all the bytes are in the buffer.

    Above is a good example of a working IOCP socket so I'm going to leave this here for people reference.