Search code examples
cpipeforkexecdeadlock

Deadlock when using fork, exec and pipes in a parallel environment


I am spawning a child process using fork and exec. Using two pipes to provide input and receive output from that process.

It works just fine most of the time but when I use something like openmp to test how it performs in concurrent environments it hangs in the read syscall, or waitpid sometimes.

When I straceed the child process I found that it is also blocked on the read syscall. Which is weird because I only wait on reading in the parent process after I've provided all of my input and closed the write end of the pipe.

I tried to create an MVCE but it is sort of long. I don't know how to make it any shorter. I removed most of the error checking code for the sake of simplicity.

Note that there are no globals in my code. And I'm not trying to read/write from the same file descriptors in multiple threads.

I can't think of what could go wrong. So hopefully you guys can spot what I'm doing wrong.

There goes:

#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <limits.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>

size_t
min(size_t first, size_t second)
{
    if(first < second)
    {
        return first;
    }

    return second;
}

struct RDI_Buffer
{
    char* data;
    size_t size;
};

typedef struct RDI_Buffer RDI_Buffer;

RDI_Buffer
rdi_buffer_init()
{
    RDI_Buffer b = {0};
    return b;
}

RDI_Buffer
rdi_buffer_new(size_t size)
{
    RDI_Buffer b;

    b.data = malloc(size);
    b.size = size;
    return b;
}

void
rdi_buffer_free(RDI_Buffer b)
{
    if(!b.data)
    {
        return;
    }

    free(b.data);
}

RDI_Buffer
rdi_buffer_resize(RDI_Buffer b, size_t new_size)
{
    if(!b.data)
    {
        return rdi_buffer_new(new_size);
    }

    char* new_data = realloc(b.data, new_size);

    if(new_data)
    {
        b.size = new_size;
        b.data = new_data;
        return b;
    }

    RDI_Buffer output = rdi_buffer_new(new_size);
    memcpy(output.data, b.data, output.size);
    rdi_buffer_free(b);
    return output;
}

RDI_Buffer
rdi_buffer_null_terminate(RDI_Buffer b)
{
    b = rdi_buffer_resize(b, b.size + 1);
    b.data[b.size - 1] = '\0';
    return b;
}

static RDI_Buffer
rw_from_fd(int w_fd, int r_fd, RDI_Buffer input)
{
    const size_t CHUNK_SIZE = 4096;

    assert(input.size <= CHUNK_SIZE);

    write(w_fd, input.data, input.size);
    close(w_fd);

    RDI_Buffer output = rdi_buffer_new(CHUNK_SIZE);

    read(r_fd, output.data, CHUNK_SIZE);

    close(r_fd);
    return output;
}

int main()
{
#pragma omp parallel for
    for(size_t i = 0; i < 100; i++)
    {
        char* thing =
                "Hello this is a sort of long text so that we can test how "
                "well this works. It should go with cat and be printed.";

        RDI_Buffer input_buffer;
        input_buffer.data = thing;
        input_buffer.size = strlen(thing);

        int main_to_sub[2];
        int sub_to_main[2];

        pipe(main_to_sub);
        pipe(sub_to_main);

        int pid = fork();

        if(pid == 0)
        {
            dup2(main_to_sub[0], STDIN_FILENO);
            dup2(sub_to_main[1], STDOUT_FILENO);

            close(main_to_sub[1]);
            close(main_to_sub[0]);
            close(sub_to_main[1]);
            close(sub_to_main[0]);

            char* argv[] = {"cat", NULL};

            execvp("cat", argv);
            exit(1);
        }

        close(main_to_sub[0]);
        close(sub_to_main[1]);

        RDI_Buffer output =
                rw_from_fd(main_to_sub[1], sub_to_main[0], input_buffer);

        int *status = NULL;
        waitpid(pid, status, 0);

        if(status)
        {
            printf("%d\n", *status);
        }

        output = rdi_buffer_null_terminate(output);

        if(strcmp(output.data, thing) == 0)
        {
            printf("good\n");
        }
        else
        {
            printf("bad\n");
        }

        rdi_buffer_free(output);
    }
}

Make sure you compile and link with -fopenmp. Like so: gcc main.c -fopenmp


Solution

  • When your main is hung, type lsof in a separate session. I think you will see something like:

    ....
    cat       5323                 steve  txt       REG              252,0    52080    6553613 /bin/cat
    cat       5323                 steve  mem       REG              252,0  1868984   17302005 /lib/x86_64-linux-gnu/libc-2.23.so
    cat       5323                 steve  mem       REG              252,0   162632   17301981 /lib/x86_64-linux-gnu/ld-2.23.so
    cat       5323                 steve  mem       REG              252,0  1668976   12849924 /usr/lib/locale/locale-archive
    cat       5323                 steve    0r     FIFO               0,10      0t0      32079 pipe
    cat       5323                 steve    1w     FIFO               0,10      0t0      32080 pipe
    cat       5323                 steve    2u      CHR              136,0      0t0          3 /dev/pts/0
    cat       5323                 steve    3r     FIFO               0,10      0t0      32889 pipe
    cat       5323                 steve    4w     FIFO               0,10      0t0      32889 pipe
    cat       5323                 steve    6r     FIFO               0,10      0t0      32890 pipe
    cat       5323                 steve    7r     FIFO               0,10      0t0      34359 pipe
    cat       5323                 steve    8w     FIFO               0,10      0t0      32890 pipe
    cat       5323                 steve   10r     FIFO               0,10      0t0      22504 pipe
    cat       5323                 steve   15w     FIFO               0,10      0t0      22504 pipe
    cat       5323                 steve   16r     FIFO               0,10      0t0      22505 pipe
    cat       5323                 steve   31w     FIFO               0,10      0t0      22505 pipe
    cat       5323                 steve   35r     FIFO               0,10      0t0      17257 pipe
    cat       5323                 steve   47r     FIFO               0,10      0t0      31304 pipe
    cat       5323                 steve   49r     FIFO               0,10      0t0      30264 pipe
    

    which raises the question, where are all those pipes coming from? Your main loop is no longer a single loop, it is a set of unsynchronized parallel loops. Look at the boilerplate below:

    void *tdispatch(void *p) {
          int to[2], from[2];
          pipe(to);
          pipe(from);
          if (fork() == 0) {
              ...
          } else {
              ...
              pthread_exit(0); 
         }
    }
    ...
    for (int i = 0; i < NCPU; i++) {
        pthread_create(..., tdispatch, ...);
    }
    for (int i = 0; i < NCPU; i++) {
        pthread_join(...);
    }
    

    Multiple instances of tdispatch can interleave the pipe(to), pipe(from), and fork() calls; thus fds are leaking into these fork'ed processes. I say leaking because the forked'd process has no idea that they are there.

    A pipe continues to respond to read() system calls while it either has buffered data or there is at least one write file descriptor open to it.

    Suppose process 5 has its normal two ends of two pipes open, pointing to pipe#10, and pipe#11; and process 6 has pipe#12 and pipe#13. But, owing to the leaking above, processes 5 also has the write end of pipe#12, and process 6 has the write end of pipe#10. Process's 5 and 6 will never exit because they are keeping each others read-pipe's open.

    The solution is pretty much what people in the earlier bit were saying: threads and forks are a tricky combination. You would have to serialize your pipe,fork,initial-close bits in order to make it work.