Search code examples
cwinsockwinsock2iocp

IOCP notification without completion key


I am building an IOCP/RIO Winsock server and have been struggling with getting the proper notification from the completion port when my AcceptEx() calls are triggered by a client request.

When I call GetQueuedCompletionStatus() after having sent a client request, I will get a successful return but without the completion key including any data.

I have used PostQueuedCompletionStatus() to get completion keys, but when my AcceptEx() calls post notification, I am not getting the completion key, just the notification.

I'm sure I'm not properly setting up or calling something, I just can't find where that is. The question here was surprisingly similar, but in the end the answers didn't solve my problem.

To distill the problem to its simplest form I started a new project and just copied over the essential bits to replicate the problem. All variables are global so nothing is out of scope, and for the sake of posting the code I've removed error checking and any loops. We simply create a single socket and give the client a few seconds to request something, then we check to see if the completion key was sent. That's it for this proof-of-error project.

The comments for the globals are arranged in the order they appear in the code and duplicate the code comments for referencing.

//server main variables
const int maxSockets = 10;
const int bufferSlicesPerSocket = 3;
const int bufferSliceSize = 400;
const int numRecvToPostPerSocket = 2;
atomic<bool> runApp(true);
bool bResult;
int iResult;

//initWinsock
WSADATA wsaData;
struct addrinfo *ptr = NULL;
struct addrinfo hints;

//resolveAddress
struct addrinfo *result = NULL;

//createIOCP1
HANDLE hCompPort1;

//createListenSocket
SOCKET listenSocket = INVALID_SOCKET;

//associateListenSocketWithIOCP

//bindListenSocket

//startListenOnSocket

//getAcceptExPointer
LPFN_ACCEPTEX pAcceptEx = NULL;
GUID guidForAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes = 0;

//getRIO
RIO_EXTENSION_FUNCTION_TABLE rio;
GUID functionTableId = WSAID_MULTIPLE_RIO;

//registerBigBuffer
RIO_BUFFERID bigBufferId;
const int totalBufferSlices = maxSockets * bufferSlicesPerSocket;
const int bigBufferSize = totalBufferSlices * bufferSliceSize;
char bigBuffer[bigBufferSize];
char * pBigBuffer = bigBuffer;

//fillBufferSliceQueue
RIO_BUF bufferSliceArray[totalBufferSlices];
concurrent_queue<int> availableBufferSliceIdQueue;

//createCompletionQueue
RIO_CQ recvCQ;
RIO_CQ sendCQ;
const int RQMaxRecvs = 5;
const int RQMaxSends = 5;
DWORD recvCQsize = maxSockets * RQMaxRecvs;
DWORD sendCQsize = maxSockets * RQMaxSends;
RIO_NOTIFICATION_COMPLETION notify;
WSAOVERLAPPED ol;

//fill empty socket queue with all Ids
concurrent_queue<int> availableEmptySocketIdQueue;

//set per-handle data
ULONG_PTR cKeyArray[maxSockets];

//check for an empty socketId and create a socket
int socketId;

//create a socket
SOCKET socketArray[maxSockets] = { INVALID_SOCKET };

//then post an accept on the socket

//associate the socket with the completion port
HANDLE hCompPort2;

//empty overlapped structure
WSAOVERLAPPED olArray[maxSockets];

//post an acceptEx on socket
const int acceptExAddrBufferLength = (sizeof(sockaddr) + 16) * 2 * maxSockets;
char acceptExAddrBuffer[acceptExAddrBufferLength];

//then create a request queue on the socket
RIO_RQ requestQueueArray[maxSockets];

//then post a receive on the socket

//get next available buffer slice id
int bufferSliceId;
int postRecv_counter;

//post a recv

//zero memory for overlapped struct
LPOVERLAPPED pol;

//check for completion
ULONG_PTR pKey;

//record socket as being in use
bool socketInUse[maxSockets];
atomic<int> numSocketsInUse = 0;

//update new socket with the listening socket's options
int numResults;
const int maxDequeue = 10;
RIORESULT rioResultArray[maxDequeue];

//calculate contexts
int socketContext;
int requestContext;

Here is main() and the initialization:

int _tmain(int argc, _TCHAR* argv[]) {
    //initWinsock
    iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);

    ZeroMemory(&hints, sizeof(hints));

    hints.ai_family = AF_INET;
    hints.ai_socktype = SOCK_STREAM;
    hints.ai_protocol = IPPROTO_TCP;
    hints.ai_flags = AI_PASSIVE;

    //resolve local address and port to be used by the server
    iResult = getaddrinfo(NULL, DEFAULT_PORT, &hints, &result);

    //create a handle for the first completion port
    hCompPort1 = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, (u_long)0, 0);

    //create a socket for the server to listen to for client connections
    listenSocket = ::WSASocket(
        AF_INET,
        SOCK_STREAM,
        IPPROTO_TCP,
        NULL,
        0,
        WSA_FLAG_REGISTERED_IO | WSA_FLAG_OVERLAPPED);

    //associate the listening socket with the first completion port
    CreateIoCompletionPort((HANDLE)listenSocket, hCompPort1, (u_long)0, 0);

    //bind the listening socket
    iResult = ::bind(listenSocket, result->ai_addr, (int)result->ai_addrlen);

    //start listen on socket
    iResult = listen(listenSocket, SOMAXCONN);

    //get the AcceptEx pointer
    iResult = WSAIoctl(
        listenSocket,
        SIO_GET_EXTENSION_FUNCTION_POINTER,
        &guidForAcceptEx,
        sizeof(guidForAcceptEx),
        &pAcceptEx,
        sizeof(pAcceptEx),
        &dwBytes,
        NULL,
        NULL);

    //get RIO
    if (0 != WSAIoctl(
        listenSocket,
        SIO_GET_MULTIPLE_EXTENSION_FUNCTION_POINTER,
        &functionTableId,
        sizeof(GUID),
        (void**)&rio,
        sizeof(rio),
        &dwBytes,
        0,
        0
        )) {}

    //register big buffer
    bigBufferId = rio.RIORegisterBuffer(pBigBuffer, bigBufferSize);

    //fill bufferSlice queue
    for (int i = 0; i < totalBufferSlices; i++) {
        bufferSliceArray[i].BufferId = bigBufferId;
        bufferSliceArray[i].Offset = i * bufferSliceSize;
        bufferSliceArray[i].Length = bufferSliceSize;
        availableBufferSliceIdQueue.push(i);
    }

    //createCompletionQueue
    notify.Type = RIO_IOCP_COMPLETION;
    notify.Iocp.IocpHandle = hCompPort1;
    notify.Iocp.Overlapped = &ol;
    notify.Iocp.CompletionKey = NULL;

    //create completion queue
    recvCQ = rio.RIOCreateCompletionQueue(
        recvCQsize, //size of queue
        &notify);   //notification mechanism
    sendCQ = rio.RIOCreateCompletionQueue(
        sendCQsize, //size of queue
        &notify);   //notification mechanism

    socketId = 1; //simplified for demo

    //create a socket
    socketArray[socketId] = ::WSASocket(
        AF_INET,                                        //af
        SOCK_STREAM,                                    //type
        IPPROTO_TCP,                                    //protocol
        NULL,                                           //lpProtocolInfo
        0,                                              //group
        WSA_FLAG_REGISTERED_IO | WSA_FLAG_OVERLAPPED);  //dwFlags

    //set completion key
    cKeyArray[socketId] = socketId;

    //then post an accept on the socket

    //empty overlapped structure
    memset(&olArray[socketId], 0, sizeof(WSAOVERLAPPED));

We now get to associating the socket with the completion port:

//associate the socket with the completion port
hCompPort2 = CreateIoCompletionPort(
    (HANDLE)socketArray[socketId],  //file handle
    hCompPort1,                     //completion port
    cKeyArray[socketId],            //completion key
    NULL);                          //number of threads

//post an acceptEx on socket
bResult = pAcceptEx(
    listenSocket,                                                   //listen socket
    socketArray[socketId],                                          //accept socket
    &acceptExAddrBuffer[socketId * (sizeof(sockaddr) + 16) * 2],    //output buffer
    0,                                                              //data length
    sizeof(sockaddr) + 16,                                          //local address length
    sizeof(sockaddr) + 16,                                          //remote address length
    &dwBytes,                                                       //bytes received
    &olArray[socketId]);                                            //overlapped struct

//then create a request queue on the socket
requestQueueArray[socketId] = rio.RIOCreateRequestQueue(
    socketArray[socketId],  //socket
    RQMaxRecvs,             //max RECVs on queue
    1,                      //max recv buffers, set to 1
    RQMaxSends,             //max outstanding sends
    1,                      //max send buffers, set to 1
    recvCQ,                 //recv completion queue
    sendCQ,                 //send completion queue
    &socketArray[socketId]);//socket context

//then post a receive on the socket

//get next available buffer slice id
bufferSliceId = 1; //simplified for demo

//post a recv
bResult = rio.RIOReceive(
    requestQueueArray[socketId],        //socket request queue
    &bufferSliceArray[bufferSliceId],   //buffer slice
    1,                                  //set to 1
    0,                                  //flags
    &bufferSliceArray[bufferSliceId]);  //request context

wprintf(L"Please send request now...\n");

std::this_thread::sleep_for(std::chrono::milliseconds(3500));

And finally, dequeuing the notification, which doesn't include the completion key for some reason.

//send a dummy message to check the IOCP
//bResult = PostQueuedCompletionStatus(hCompPort2,(DWORD)0,(ULONG_PTR)socketId,NULL);   

//zero memory for overlapped struct
ZeroMemory(&pol, sizeof(LPOVERLAPPED));

//check for completion
iResult = GetQueuedCompletionStatus(
    hCompPort2, //completion port
    &dwBytes,   //number of bytes transferred
    &pKey,      //completion key pointer
    &pol,       //overlapped pointer
    0);         //milliseconds to wait

//update new socket with the listening socket's options
iResult = setsockopt(
    socketArray[pKey],          //socket
    SOL_SOCKET,                 //level
    SO_UPDATE_ACCEPT_CONTEXT,   //optname
    (char*)&listenSocket,       //*optval
    sizeof(listenSocket));      //optlen

wprintf(L"accepted socketId: %d\n", pKey);

return 0;
}

The completion key doesn't have the data, but when I manually send a PostQueuedCompletionStatus() I'll get the key with the notification. What am I failing to do in my AcceptEx() or CreateIoCompletionPort()???


Solution

  • You are assigning 0 as the completion key for your listening socket, and your RIO completion queues. You are assigning other completion keys for the client sockets that AcceptEx() populates when accepting clients. GetQueuedCompletionStatus() reports the completion key of the socket that was performing a queued IOCP operation.

    When AcceptEx() completes, GetQueuedCompletionStatus() reports the completion key of the listening socket (which is 0).

    When RIOReceive() completes, GetQueuedCompletionStatus() reports the completion key of the client socket/queue that was read from.

    So you need to assign unique completion keys for each type of socket (listening vs client), and call setsockopt(SO_UPDATE_ACCEPT_CONTEXT) only when you receive the listening socket's completion key. If you don't use completion keys in this manner, you have to use the OVERLAPPED struct to pass around your own custom per-socket context information instead, so you can still differentiate an AcceptEx() operation from other operations.