Search code examples
cunixpipeexecexecvp

C fork loop to exec infinite commands (urandom/tail on an active file)


I'm trying to reproduce the behaviour of a pipe in a unix (OSX) shell (like bash) with C. I have no problem to handle a command which is not infinite (for example: ls | wc -c).

But as you can imagine, I have an issue, I can't handle an infinite command, for instance: base64 /dev/urandom" | head -c 1000. The output of this command is immediately the first 1000 characters of urandom.

My function just wait the end of the urandom which is infite... So, I need to kill with "CTRL + C" the process (and handle the signal to just the child and not my function) to print the first 1000 character.

My function to execute all the pipes:

#define READ_END        0
#define WRITE_END       1


void    exec_pipes(char ***argv)
{
    int   p[2];
    int   fd_save = 0;

    while (*argv)
    {
        pipe(p);
        if (fork() == 0)
        {
            dup2(fd_save, STDIN_FILENO);
            if (*(argv + 1))
                dup2(p[WRITE_END], STDOUT_FILENO);
            close(p[READ_END]);
            execvp(**argv, *argv);
            exit(EXIT_FAILURE);
        }
        else
        {
            wait(NULL);
            fd_save = p[READ_END];
            close(p[WRITE_END]);
            (*argv)++;
        }
    }
}

Here, you can check my entire code with the main : https://www.onlinegdb.com/Hkbjd3WOz

Thanks in advance.


Solution

  • Not, you don't. When a reading process just finishes, it closes the reading side of the pipe, and so the writer is not allowed to write anymore to the pipe, and gets an error EPIPE (cannot write to pipe) from the system.

    For this to ocurr, no process has to have it open for reading, because the kernel counts the number of reading and writing processes, and will not give this error while at least one process left it open for reading (this means the first process in the chain has to close the read file descriptor of the pipe, if it is not going to read from it)

    Then, you have to close the descriptor not used (it depends if you want the parent to be the writer or the reader) IN THE PARENT AND CHILD, and dup2(2) the other descriptor (again, IN THE PARENT AND CHILD) to file descriptor 0 (in) or 1 (out). (first to say, I've changed your /dev/urandom example by a call to the yes command, because it's standard in all unix flavours, and produces also infinite output)

    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    
    char *writer_program = "yes";
    char *argv_writer[] = {
        "yes", "yes", NULL,
    };
    char *reader_program = "head";
    char *argv_reader[] = {
        "head", "-c", "10", NULL,
    };
    
    int main()
    {
        int pipe_fds[2];
        int res;
    
        pipe(pipe_fds);
        if ((res = fork()) < 0) {     /* PLEASE, CHECK FOR ERRORS. */
            perror("fork");
        } else if (res == 0) {        /* child process, ACTING AS WRITER */
            close(pipe_fds[0]);       /* close writing part of pipe */
            dup2(pipe_fds[1], 1);     /* dup2 reading part as stdin */
            execvp(writer_program, argv_writer);
            perror("execvp");         /* PLEASE, CHECK FOR ERRORS. */
        } else {                      /* parent process, ACTING AS A READER */
            close(pipe_fds[1]);       /* just the opposite */
            dup2(pipe_fds[0], 0);
            execvp(reader_program, argv_reader);
            perror("execvp");         /* PLEASE, CHECK FOR ERRORS. */
        }
        /* BOTH, THIS IS REACHED IN CASE OF FAILURE (fork fails, or any
         * of the exec(2) calls  fail. */
        exit(EXIT_FAILURE);
    }
    

    EDIT

    Below is a complete example with a pipeline chain equivalent to the next:

    dd if=/dev/urandom ibs=1024 | base64 | head -n 150 | sort -r | pr
    

    in this case, the first program to die is head, as you wanted, and every program dies behind it. The parent program is waiting for all his children to die, and every system call has been wrapped, so you get a trace of execution through stderr, to see what happens.

    #include <sys/types.h>
    #include <sys/wait.h>
    #include <stdio.h>
    #include <errno.h>
    #include <stdlib.h>
    #include <string.h>
    #include <unistd.h>
    
    #define F(fmt) "[pid=%d]:"__FILE__":%d:%s: " fmt, getpid(), __LINE__, __func__
    
    #define SIZE(a) (sizeof a/sizeof *a)
    
    #define ERR(fmt, ...) do{\
                fprintf(stderr, \
                        F(fmt": %s (errno = %d)\n"),\
                        ##__VA_ARGS__,\
                        strerror(errno),\
                        errno);\
                exit(EXIT_FAILURE);\
        }while(0)
    
    #define CLOSE(expr) do{\
                close(expr);\
                fprintf(stderr, \
                        F("close("#expr" === %d);\n"), \
                        (expr));\
        }while(0)
    
    #define PIPE(var) do{\
                if(pipe(var)<0) ERR("pipe"); \
                fprintf(stderr,F("PIPE ==> %d, %d\n"), var[0], var[1]);\
        }while(0)
    
    #define FORK() do{\
                if((res = fork()) < 0)ERR("fork");\
                fprintf(stderr,F("FORK ==> %d\n"), res);\
        }while(0)
    
    #define DUP(expr1, expr2) do{\
                if((res = dup2(expr1, expr2)) < 0) ERR("dup");\
                fprintf(stderr,\
                        F("DUP("#expr1" === %d, "#expr2" === %d);\n"),\
                        (expr1), (expr2));\
        }while(0)
    
    
    char * argv_DISK_DUMP[] = { "dd", "if=/dev/urandom", "ibs=1024", NULL };
    char * argv_BASE64[] =    { "base64", NULL };
    char * argv_HEAD[] =      { "head", "-n", "150", NULL };
    char * argv_SORT[] =      { "sort", "-r", NULL };
    char * argv_PR[] =        { "pr", NULL };
    
    struct pipe_program {
     pid_t pid;
     pid_t ppid;
        char *pname;
        char **argv;
    } pipe_programs[] = {
    
        0, 0, "dd", argv_DISK_DUMP,
        0, 0, "base64", argv_BASE64,
        0, 0, "head", argv_HEAD,
        0, 0, "sort", argv_SORT,
        0, 0, "pr", argv_PR,
    
    };
    
    /* size of last array */
    size_t pipe_programs_n = SIZE(pipe_programs);
    
    static size_t printus(int ix, struct pipe_program *p);
    static pid_t WAIT(int *status);
    
    int main()
    {
        int res, i;
        struct pipe_program *p = pipe_programs;
        int input_fd = 0; /* first process is connected to standard input */
        static int pipe_fds[2] = { -1, -1 };
    
        for(i = 0; i < pipe_programs_n - 1; i++, p++) {
    
                PIPE(pipe_fds);
    
                FORK();
    
                if (res == 0) { /* child process, we have redirected nothing yet. */
    
                        p->pid = getpid();
                        p->ppid = getppid();
    
                        /* print who we are */
                        printus(i, p);
    
                        /* redirect input, if needed */
                        if (input_fd != 0) {
                                DUP(input_fd, 0);
                                CLOSE(input_fd); /* not used after redirection */
                        }
    
                        CLOSE(pipe_fds[0]); /* we don't use this */
    
                        /* and output */
                        DUP(pipe_fds[1], 1);
                        CLOSE(pipe_fds[1]);
    
                        execvp(p->pname, p->argv);
    
                        ERR("execvp: %s", p->pname);
                        /* NOTREACHED */
    
                }
                /* parent process */
    
                /* save pid to be used later */
                p->pid = res; /* we'll use it later */
                p->ppid = getpid();
    
                /* close unused pipe descriptor */
                CLOSE(pipe_fds[1]);
    
                /* if we have an old input_fd, then close it */
                if (input_fd) CLOSE(input_fd);
    
                /* ... and save pipe read descriptor */
                input_fd = pipe_fds[0];
        } /* for */
    
        /* now we have our input connected to the output of the last process */
        FORK();
        if (res == 0) { /* child, last process in the pipe */
    
                p->pid = getpid();
                p->ppid = getppid();
    
                /* print who we are */
                printus(i, p);
    
                /* redirect input */
                if (input_fd != 0) {
                        DUP(input_fd, 0);
                        CLOSE(input_fd); /* not used after_redirection */
                }
    
                /* this time no output redirection */
    
                execvp(p->pname, p->argv);
    
                ERR("execvp");
                /* NOTREACHED */
        }
    
        CLOSE(pipe_fds[1]);
        if (input_fd) CLOSE(input_fd);
    
        /* parent code... we did pipe_programs_n fork()'s so we
         * have to do pipe_programs_n wait()'s */
        int status;
        pid_t cld_pid;
        /* while we can wait for a child */
        while ((cld_pid = WAIT(&status)) > 0) {
                for (i = 0, p = pipe_programs; i < pipe_programs_n; i++, p++) {
                        if (cld_pid == p->pid) {
                                fprintf(stderr,
                                        F("Child finished: pid = %d\n"),
                                        cld_pid);
                                printus(i, p);
                                break;
                        }
                }
        } /* while */
        exit(EXIT_SUCCESS);
    }
    
    static size_t printus(int ix, struct pipe_program *p)
    {
        size_t res = 0;
        int j;
        static char buffer[1024];
        char *s = buffer;
        size_t bfsz = sizeof buffer;
        size_t n;
    
    #define ACT() do{s+=n; bfsz-=n;}while(0)
    
        n = snprintf(s, bfsz,
                F("%d: pid = %d, ppid = %d, program = \"%s\": args = "),
                ix, p->pid, p->ppid, p->pname);
        ACT();
        for (j = 0; p->argv[j]; j++) {
                n = snprintf(s, bfsz,
                        "%s\"%s\"",
                        j ? ", " : "{",
                        p->argv[j]);
            ACT();
        }
        n = snprintf(s, bfsz, "};\n");
        ACT();
        fputs(buffer, stderr);
    
        return res;
    }
    
    static pid_t WAIT(int *status)
    {
        pid_t res = wait(status);
        fprintf(stderr, F("WAIT() ==> %d\n"), res);
        return res;
    }
    

    you can get the whole program with the following command:

    git clone [email protected]:mojadita/pipe.git
    

    beware that the program there is almost exactly equal, but some work has been done to facilitate the writing of different pipelines without having to touch several places in the main source file. Edit the file pipe.i if you download the program from the git server and you want to modify the pipeline. Then make and voilá!!! :) The program has been tested on linux, freebsd and mac osx, so I think you'll have no problem to adapt it to your needs. If you get it from the github, you'll also have a Makefile.