Search code examples
cunixpthreadsselect-function

select() not detecting incoming data


Objective: N nodes (running on different machines) should communicate with each other by establishing TCP connections with each other. Sending and receiving messages are done by 2 threads created by the process. Initially the main process connects all nodes with each other, creates the 2 threads and gives it a list of file descriptors which can be used by threads to send and receive data. The below structure is filled by the main process and passed to the threads.

typedef struct
{
    char hostName[MAXIMUM_CHARACTERS_IN_HOSTNAME];  /* Host name of the node */
    char portNumber[MAXIMUM_PORT_LENGTH];           /* Port number of the node */
    char nodeId[MAXIMUM_NODE_ID_LENGTH];            /* Node ID of the node */
    int socketFd;                                   /* Socket file descriptor */
    int socketReady;        /* Flag to indicate if socket information is filled */
}SNodeInformation;

PS: socketFd is the socket descriptor received by either accept() or by socket() depending on how the connection was established (Either listening to connections from a node or connecting to a node).

An array of SNodeInformation of size MAX_NUM_OF_NODES is used.

The send thread goes through the nodeInformation and sends a message "Hello" to all nodes as except itself show below.

void *sendMessageThread(void *pNodeInformation) {

    int i;
    int ownNodeId;
    int bytesSent = 0;
    char ownHostName[MAXIMUM_CHARACTERS_IN_HOSTNAME];

    SNodeInformation *nodeInformation = (SNodeInformation *) pNodeInformation;
    SNodeInformation *iterNodeInformation;

    printf("SendMessageThread: Send thread created\n");

    if(gethostname(ownHostName, MAXIMUM_CHARACTERS_IN_HOSTNAME) != 0) {
        perror("Error: sendMessageThread, gethostname failed\n");
        exit(1);
    }

    for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
        if(strcmp((const char*) iterNodeInformation->hostName, (const char*) ownHostName) != 0)  {
            /* Send message to all nodes except yourself */
            bytesSent = send(iterNodeInformation->socketFd, "Hello", 6, 0);

            if(bytesSent == -1) {
                printf("Error: sendMessageThread, sending failed, code: %s FD %d\n",     strerror(errno), iterNodeInformation->socketFd);
            }
        }
    }

    pthread_exit(NULL);
}

The receive thread goes through the nodeInformation, sets up a file descriptor set and uses select to wait for incoming data as show below.

void *receiveMessageThread(void *pNodeInformation)
{
    int i;
    int fileDescriptorMax = -1;
    int doneReceiving = 0;
    int numberOfBytesReceived = 0;
    int receiveCount = 0;
    fd_set readFileDescriptorList;
    char inMessage[6];

    SNodeInformation *nodeInformation = (SNodeInformation *) pNodeInformation;
    SNodeInformation *iterNodeInformation;

    printf("ReceiveMessageThread: Receive thread created\n");

    /* Initialize the read file descriptor */
    FD_ZERO(&readFileDescriptorList);

    for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
        FD_SET(iterNodeInformation->socketFd, &readFileDescriptorList);

        if(iterNodeInformation->socketFd > fileDescriptorMax) {
            fileDescriptorMax = iterNodeInformation->socketFd;
        }
    }

    printf("ReceiveMessageThread: fileDescriptorMax:%d\n", fileDescriptorMax);

    while(!doneReceiving) {
        if (select(fileDescriptorMax+1, &readFileDescriptorList, NULL, NULL, NULL) == -1) {
            perror("Error receiveMessageThread, select failed \n");
            return -1;
        }

        for(i=0 ; i<fileDescriptorMax ; i++) {
            if (FD_ISSET(i, &readFileDescriptorList)) {
                /* Check if any FD was set */
                printf("ReceiveThread: FD set %d\n", i);

                /* Receive data from one of the nodes */
                if ((numberOfBytesReceived = recv(i, &inMessage, 6, 0)) <= 0) {
                    /* Got error or connection closed by client */
                    if (numberOfBytesReceived == 0) {
                        /* Connection closed */
                        printf("Info: receiveMessageThread, node %d hung up\n", i);
                    }
                    else {
                        perror("Error: receiveMessageThread, recv FAILED\n");
                    }

                    close(i);

                    /* Remove from Master file descriptor set */
                    FD_CLR(i, &readFileDescriptorList); 
                    doneReceiving = 1;
                }
                else {
                    /* Valid data from a node */
                    inMessage[6] = '\0';

                    if(++receiveCount == MAXIMUM_NUMBER_OF_NODES-1) {
                        doneReceiving = 1;
                    }

                    printf("ReceiveThread: %s received, count: %d\n", inMessage, rece    iveCount);
                }
            }
        }
    }
    pthread_exit(NULL);
}

Expected Output: I tried with just 2 processes, P1 (Started first) and P2 running on machine1 and another on machine2. Both the processes in the machines should first connect and then the threads should send and receive the message "Hello" and exit.

Observed Output: The P1 is able to send the message and P2 (receiver thread) is able to receive the message "Hello". But P1 (receiver thread) is not able to get the message from P2 (Sending thread). Application code is the same in both the machines but every time, the process started first does not get the message from the other process. I added a print to just check if some file descriptor was set, but I don't see it for P1 but only for the P2. The send in the receiving process is not failing, it returns with 6. I checked the maximum value of file descriptors, its correct.

If I start P2 first and then P1 then I can see that P1 receives the message from P2 and exists while P2 waits infinitely for the message from P1.

I am not sure if the problem is because of incorrect use of socket descriptors or because of threads ?


Solution

  • Two issues:

    1 The loop testing for a file descriptor being set, does not include all file descriptors put into the set. (This programming error is expected to be the reason for the malfunction described in the OP.)

    2 The sets of file descriptors passed to select() are modified by select(), so the set need to be re-initialized before for select() again. (The programming error would only be notable if from more than one socket data sall be received.)

    Please see the following mod/s to the OP's code:

    void *receiveMessageThread(void *pNodeInformation)
    {
        ...
    
        printf("ReceiveMessageThread: Receive thread created\n");
    
        while(!doneReceiving) {
            /* Initialize the read-set of file descriptors */
    
            /* Issue 2 fixed from here ... */
            FD_ZERO(&readFileDescriptorList);
    
            for(i=0, iterNodeInformation=nodeInformation ; i<MAXIMUM_NUMBER_OF_NODES ; i++, iterNodeInformation++) {
                FD_SET(iterNodeInformation->socketFd, &readFileDescriptorList);
    
                if (iterNodeInformation->socketFd > fileDescriptorMax) {
                    fileDescriptorMax = iterNodeInformation->socketFd;
                }
            }
            /* ... up to here. */
    
            printf("ReceiveMessageThread: fileDescriptorMax:%d\n", fileDescriptorMax);
    
            if (select(fileDescriptorMax+1, &readFileDescriptorList, NULL, NULL, NULL) == -1) {
                perror("Error receiveMessageThread, select failed \n");
                return -1;
            }
    
            for(i=0 ; i <= fileDescriptorMax ; i++) { /* Issue 1 fixed here. */
                ...