Search code examples
csocketspipeposixposix-select

C POSIX processes pipe/socket communication exercise second process gets stuck


The idea is that I have a "main.c" program that launches using fork() two processes, P and G, as well as creates some file descriptors needed for the pipes (of the 3 pipes, only 1 is relevant for my doubt: the codes are adjusted accordingly).

These two processes communicate via a socket (G is the server, P a client). The P process also has two incoming pipe ends, from which it has to check if new data is incoming via a select. One of the pipes actually carries data coming from the G process, so that in the end there is a single piece of information that is looping between G and P, and it actually gets modified (through a predefined formula) while being in P, before being sent again to G. The other pipe is not relevant for the moment.

The problem I have is that after the first iteration, it seems like the G process gets stuck, thus no data gets sent through the pipe from G to P. However the select() in P keeps acknowledging that P is receiving something (so P again sends the data to G). But G displays nothing on the terminal.

The data that needs to be passed is a combination of the computed value and a timestamp, so I chose to create a struct. I use Ubuntu to test the code, and I compile with "gcc".

Here is the code I have (config.h, main.c, P.c, G.c):

config.h

//This header stores all the definitions that the nodes need
#ifndef CONFIG_H
#define CONFIG_H

#ifndef M_PI
#define M_PI 3.14159265358979323846
#endif

#define NEXT_IP '192.168.1.233' //IP address of the next machine in the chain
#define NEXT_PORT 5000          //chosen port for the communication

#define buff 250000
#define SIZE 10

#define h_addr h_addr_list[0] /* for backward compatibility */

int run_mode = 0; //set to 0 to go debug mode, 1 for multiple machine

float rf = 1; //sine wave frequency

useconds_t waiting_time = 1000000; //waiting time (in microseconds) applied by process P before sending the updated token

struct message
{
    time_t timestamp;
    float value;
    int status;
};
struct message msg[buff]; //definition of the message

typedef struct
{
    float token_value;
    time_t token_timestamp;
} token_struct;

#endif

main.c

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <errno.h>
#include <netdb.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <signal.h>
#include <syslog.h>
#include <fcntl.h>
#include <math.h>
#include "config.h"

// This is the main, you have to only execute this command: ./main (you can also run sudo netstat -tulpn for socket troubleshooting)
// The duty of this piece of code is to load config data and to launch all the needed processes (S, P, G and L)

int main(int argc, char *argv[])
{
    int P, G, S, L;

    int pfd1[2]; //file descriptors for pipe 1
    int pfd2[2]; //file descriptors for pipe 2
    int pfd3[2]; //file descriptors for pipe 3

    int wait_status = 0;

    if (pipe(pfd1) < 0) //error condition on pipe 1
    {
        perror("Pipe 1 creation error");
        return -1;
    }
    if (pipe(pfd2) < 0) //error condition on pipe 2
    {
        perror("Pipe 2 creation error");
        return -1;
    }
    if (pipe(pfd3) < 0) //error condition on pipe 2
    {
        perror("Pipe 3 creation error");
        return -1;
    }

    char read1[SIZE];
    char write1[SIZE];
    char read2[SIZE];
    char write2[SIZE];
    char read3[SIZE];
    char write3[SIZE];

    sprintf(read1, "%d", pfd1[0]);  //load the fd input/output (3rd arg.) into the char array (1st arg.),
    sprintf(write1, "%d", pfd1[1]); //while formatting it as stated in 2nd arg.
    sprintf(read2, "%d", pfd2[0]);
    sprintf(write2, "%d", pfd2[1]);
    sprintf(read3, "%d", pfd3[0]);
    sprintf(write3, "%d", pfd3[1]);

    argv[0] = read1;  //pipe1: read
    argv[1] = write1; //pipe1: write
    argv[2] = read2;  //pipe2: read
    argv[3] = write2; //pipe2: write
    argv[4] = read3;  //pipe3: read
    argv[5] = write3; //pipe3: write
                      //I will be passing to each node all the pipe ends, by transforming their fd in char and then reverting them to integers

    G = fork();

    if (G < 0) //error condition on fork
    {
        perror("Fork G");
        return -1;
    }

    if (G == 0) //G process
    {
        char *node_name = "./G";
        if (execvp(node_name, argv) < 0) //error handling for file G
        {
            perror("Exec failed for G");
            return -1;
        }
    }
    else if (G > 0)
    {
        P = fork();

        if (P < 0) //error condition on fork
        {
            perror("Fork P");
            return -1;
        }

        if (P == 0) //P process
        {
            char *node_name = "./P";
            if (execvp(node_name, argv) < 0) //error handling for file P
            {
                perror("Exec failed for P");
                return -1;
            }
        }

        printf("[all processes setup and running]\n");

        wait(&wait_status);
        close(pfd1[0]);
        close(pfd1[1]);
        close(pfd2[0]);
        close(pfd2[1]);
        close(pfd3[0]);
        close(pfd3[1]);
        return 0;
    }
}

P.c

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <errno.h>
#include <netdb.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <signal.h>
#include <syslog.h>
#include <fcntl.h>
#include <math.h>
#include "config.h"

//This node is the computational core. It is also the nevralgic waypoint of communications: all other nodes involved are
//in some way or another bond to P

/*
Instructions for compiling this node:
gcc P.c -o P -lm
*/

void error(const char *m) //Display a message about the error on stderr and then aborts the program
{
    perror(m);
    exit(0);
}

int main(int argc, char *argv[])
{
    close(atoi(argv[1]));
    close(atoi(argv[3]));
    close(atoi(argv[4]));

    int state = 1;

    token_struct token;
    token.token_value = 1;
    token.token_timestamp = time(NULL);

    struct timeval tv;
    int retval;

    float dt = 0; //time delay between reception and delivery time instants of the token
    clock_t t = 0;

    pid_t Ppid;
    Ppid = getpid();
    printf("P: my PID is %d\n", Ppid);
    //new token = received token + DT x (1. - (received token)^2/2) x 2 pi x RF

    struct message msg;
    char address[13] = "192.168.1.233";

    int sockfd; //socket file descriptor
    int portno; //stores the port number on which the server accepts connections
    int n;
    struct sockaddr_in serv_addr;
    struct hostent *server;
    portno = 5000;                            //port number definition
    sockfd = socket(AF_INET, SOCK_STREAM, 0); //create a new socket
    if (sockfd < 0)
    {
        error("Error creating a new socket\n");
    }

    if (!run_mode)
    {
        server = gethostbyname("ZenBook");
    }
    else
    {
        server = gethostbyname(address);
        portno = NEXT_PORT;
    }

    if (server == NULL)
    {
        fprintf(stderr, "Could not find matching host name\n");
        exit(0);
    }
    bzero((char *)&serv_addr, sizeof(serv_addr)); //the function bzero() sets all values inside a buffer to zero
    serv_addr.sin_family = AF_INET;               //this contains the code for the family of the address
    bcopy((char *)server->h_addr, (char *)&serv_addr.sin_addr.s_addr, server->h_length);
    serv_addr.sin_port = htons(portno);
    if (connect(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0)
        error("Connection failed");

    printf("[P node sending FIRST message]\n");
    n = write(sockfd, &(token), sizeof(token_struct));
    if (n < 0)
        error("Error writing to socket\n");

    while (1)
    {
        tv.tv_sec = 2;                   //amount of seconds the select listens for incoming data from either pipe 1 and 2
        tv.tv_usec = 0;                  //same as the previous line, but with microseconds
        fd_set readfds;                  //set of involved pipes from which P needs to read through the select
        FD_ZERO(&readfds);               //inizialization of the set
        FD_SET(atoi(argv[0]), &readfds); //addition of the desired pipe ends to the set
        FD_SET(atoi(argv[2]), &readfds);

        if (state == 1) //running situation
        {
            retval = select(atoi(argv[2]) + 1, &readfds, NULL, NULL, &tv);
            printf("retval: %d\n", retval);

            if (retval == -1)
            {
                perror("Select failed\n");
            }

            else if (retval > 0)
            {
                if (FD_ISSET(atoi(argv[0]), &readfds)) //read of first pipe (data coming from S) is ready
                {
                    //code
                }
                if (FD_ISSET(atoi(argv[2]), &readfds)) //read of second pipe (data coming from G) is ready
                {

                    //code



                    msg.timestamp = time(NULL); //get the current time

                    //This section is related to the communication with G, as the one with L is all set
                    t = clock() - t;
                    dt = ((float)t) / ((float)CLOCKS_PER_SEC); //by doing like this, the first cycle (and only that one) has a meaningless dt value
                        token.token_value = msg.value + dt * (((float)1) - (powf(msg.value, ((float)2)) / ((float)2))) * ((float)2) * ((float)M_PI) * rf;
                    t = clock();
                    printf("[P node sending message]\n");
                    usleep(waiting_time);
                    token.token_timestamp = msg.timestamp;
                    n = write(sockfd, &token, sizeof(token_struct));
                    if (n < 0)
                        error("Error writing to socket\n");
                }
            }
            else if (retval == 0)
                printf("No data written sent to pipes in the last 2 seconds\n");
        }

        else if (state == 0) //pausing sitation
        {
            //code
        }
    }
    close(atoi(argv[0]));
    close(atoi(argv[2]));
    close(atoi(argv[5]));
    close(sockfd);
    return 0;
}

G.c

#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <time.h>
#include <errno.h>
#include <netdb.h>
#include <sys/wait.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <signal.h>
#include <syslog.h>
#include <fcntl.h>
#include <math.h>
#include "config.h"

//This node can be run in 2 modes: debug mode (single machine) or normal mode (communicating with other PCs);
//in the first case it receives tokens from P and then sends them back to it, in the other scenario it still
//receives data from P, but the token is sent to another machine

void error(const char *m) //display a message about the error on stderr and then abort the program
{
    perror(m);
    exit(1);
}

int main(int argc, char *argv[])
{
    close(atoi(argv[0]));
    close(atoi(argv[1]));
    close(atoi(argv[2]));
    close(atoi(argv[4]));
    close(atoi(argv[5]));

    int sockfd; //socket file descriptor
    int newsockfd;
    int portno; //port of the server for the client connection
    socklen_t clilen;
    struct sockaddr_in serv_addr, cli_addr;
    int n;

    token_struct token;
    token.token_value = 5;
    token.token_timestamp = time(NULL);

    pid_t Gpid;
    Gpid = getpid();
    printf("G: my PID is %d\n", Gpid);

    sockfd = socket(AF_INET, SOCK_STREAM, 0); //create a new socket
    if (sockfd < 0)
        error("Error creating a new socket\n");
    bzero((char *)&serv_addr, sizeof(serv_addr)); //the function bzero() sets all values inside a buffer to zero
    portno = 5000;
    serv_addr.sin_family = AF_INET; //this contains the code for the family of the address
    serv_addr.sin_addr.s_addr = INADDR_ANY;
    serv_addr.sin_port = htons(portno);
    if (bind(sockfd, (struct sockaddr *)&serv_addr, sizeof(serv_addr)) < 0) //the bind() system call binds a socket to an address
        error("Error on binding\n");
    listen(sockfd, 5); //system call that allows the process to listen for connections over the socket
    printf("[G node waiting for messages]\n");
    if (!run_mode)
    {
        while (1)
        {
            clilen = sizeof(cli_addr);
            newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, (socklen_t *)&clilen);
            //The accept() system call causes the process to block until a client connects to the server
            if (newsockfd < 0)
            {
                perror("'accept()' system call failed\n");
                return 1;
            }
            else
            {
                puts("Connection accepted\n");

                n = read(newsockfd, &(token), sizeof(token_struct));
                if (n < 0)
                    error("Error reading from socket\n");

                printf("Here is the message: %f | received at: %li\n", token.token_value, token.token_timestamp);
                write(atoi(argv[3]), &(token), sizeof(token_struct));
                printf("G: I tried to write on the pipe\n");
            }
        }
    }
    else //This is the code relative to the multiple machine case
    {
        while (1)
        {
            //code
        }
    }

    close(sockfd);
    return 0;
}

Thanks to anybody willing to help. I have tried to do it on my own for several days now, with no success. I am not a programmer by any means and this project has been a great challenge for me so far.


Solution

  • The loop in G.c needs to be reworked. Looks like what you do is accept() a connect request, read from the socket then loop back up to the accept() again at which point process G then blocks waiting for a new connection request to come in.

    You need to do the accept() before going into the loop and the loop just keeps reading from the socket using a blocking call.

    The select() in P.c has a time out which means that if nothing is received before the timer expires, the select() will return indicating a timeout. I don't see where you are actually reading from the socket in P. If you don't read the data waiting on the socket then when you do the select() again in the loop to check for further data, you will get an indication there is still data to be read. This may be why you are seeing the messages from P despite nothing further being sent by G.

    So you need something like the following in G.c. This is just thrown together and is not tested but it reads right. There may be a syntax error in this code fragment so you will need test and adjust it more than likely.

    {
            clilen = sizeof(cli_addr);
            newsockfd = accept(sockfd, (struct sockaddr *)&cli_addr, (socklen_t *)&clilen);
            //The accept() system call causes the process to block until a client connects to the server
            if (newsockfd < 0)
            {
                perror("'accept()' system call failed\n");
                return 1;
            }
            else {
                puts("Connection accepted\n");
            }
        while (1)
        {
            {
                n = read(newsockfd, &(token), sizeof(token_struct));
                if (n < 0)
                    error("Error reading from socket\n");
    
                printf("Here is the message: %f | received at: %li\n", token.token_value, token.token_timestamp);
                write(atoi(argv[3]), &(token), sizeof(token_struct));
                printf("G: I tried to write on the pipe\n");
            }
        }
    }
    

    And in P.c you need to read from the socket after the select() completes.