I am spawning a child process using fork and exec. Using two pipes to provide input and receive output from that process.
It works just fine most of the time but when I use something like openmp to test how it performs in concurrent environments it hangs in the read
syscall, or waitpid
sometimes.
When I strace
ed the child process I found that it is also blocked on the read
syscall. Which is weird because I only wait on reading in the parent process after I've provided all of my input and closed the write end of the pipe.
I tried to create an MVCE but it is sort of long. I don't know how to make it any shorter. I removed most of the error checking code for the sake of simplicity.
Note that there are no globals in my code. And I'm not trying to read/write from the same file descriptors in multiple threads.
I can't think of what could go wrong. So hopefully you guys can spot what I'm doing wrong.
There goes:
#include <string.h>
#include <assert.h>
#include <unistd.h>
#include <limits.h>
#include <sys/wait.h>
#include <stdio.h>
#include <stdlib.h>
size_t
min(size_t first, size_t second)
{
if(first < second)
{
return first;
}
return second;
}
struct RDI_Buffer
{
char* data;
size_t size;
};
typedef struct RDI_Buffer RDI_Buffer;
RDI_Buffer
rdi_buffer_init()
{
RDI_Buffer b = {0};
return b;
}
RDI_Buffer
rdi_buffer_new(size_t size)
{
RDI_Buffer b;
b.data = malloc(size);
b.size = size;
return b;
}
void
rdi_buffer_free(RDI_Buffer b)
{
if(!b.data)
{
return;
}
free(b.data);
}
RDI_Buffer
rdi_buffer_resize(RDI_Buffer b, size_t new_size)
{
if(!b.data)
{
return rdi_buffer_new(new_size);
}
char* new_data = realloc(b.data, new_size);
if(new_data)
{
b.size = new_size;
b.data = new_data;
return b;
}
RDI_Buffer output = rdi_buffer_new(new_size);
memcpy(output.data, b.data, output.size);
rdi_buffer_free(b);
return output;
}
RDI_Buffer
rdi_buffer_null_terminate(RDI_Buffer b)
{
b = rdi_buffer_resize(b, b.size + 1);
b.data[b.size - 1] = '\0';
return b;
}
static RDI_Buffer
rw_from_fd(int w_fd, int r_fd, RDI_Buffer input)
{
const size_t CHUNK_SIZE = 4096;
assert(input.size <= CHUNK_SIZE);
write(w_fd, input.data, input.size);
close(w_fd);
RDI_Buffer output = rdi_buffer_new(CHUNK_SIZE);
read(r_fd, output.data, CHUNK_SIZE);
close(r_fd);
return output;
}
int main()
{
#pragma omp parallel for
for(size_t i = 0; i < 100; i++)
{
char* thing =
"Hello this is a sort of long text so that we can test how "
"well this works. It should go with cat and be printed.";
RDI_Buffer input_buffer;
input_buffer.data = thing;
input_buffer.size = strlen(thing);
int main_to_sub[2];
int sub_to_main[2];
pipe(main_to_sub);
pipe(sub_to_main);
int pid = fork();
if(pid == 0)
{
dup2(main_to_sub[0], STDIN_FILENO);
dup2(sub_to_main[1], STDOUT_FILENO);
close(main_to_sub[1]);
close(main_to_sub[0]);
close(sub_to_main[1]);
close(sub_to_main[0]);
char* argv[] = {"cat", NULL};
execvp("cat", argv);
exit(1);
}
close(main_to_sub[0]);
close(sub_to_main[1]);
RDI_Buffer output =
rw_from_fd(main_to_sub[1], sub_to_main[0], input_buffer);
int *status = NULL;
waitpid(pid, status, 0);
if(status)
{
printf("%d\n", *status);
}
output = rdi_buffer_null_terminate(output);
if(strcmp(output.data, thing) == 0)
{
printf("good\n");
}
else
{
printf("bad\n");
}
rdi_buffer_free(output);
}
}
Make sure you compile and link with -fopenmp
. Like so: gcc main.c -fopenmp
When your main is hung, type lsof in a separate session. I think you will see something like:
....
cat 5323 steve txt REG 252,0 52080 6553613 /bin/cat
cat 5323 steve mem REG 252,0 1868984 17302005 /lib/x86_64-linux-gnu/libc-2.23.so
cat 5323 steve mem REG 252,0 162632 17301981 /lib/x86_64-linux-gnu/ld-2.23.so
cat 5323 steve mem REG 252,0 1668976 12849924 /usr/lib/locale/locale-archive
cat 5323 steve 0r FIFO 0,10 0t0 32079 pipe
cat 5323 steve 1w FIFO 0,10 0t0 32080 pipe
cat 5323 steve 2u CHR 136,0 0t0 3 /dev/pts/0
cat 5323 steve 3r FIFO 0,10 0t0 32889 pipe
cat 5323 steve 4w FIFO 0,10 0t0 32889 pipe
cat 5323 steve 6r FIFO 0,10 0t0 32890 pipe
cat 5323 steve 7r FIFO 0,10 0t0 34359 pipe
cat 5323 steve 8w FIFO 0,10 0t0 32890 pipe
cat 5323 steve 10r FIFO 0,10 0t0 22504 pipe
cat 5323 steve 15w FIFO 0,10 0t0 22504 pipe
cat 5323 steve 16r FIFO 0,10 0t0 22505 pipe
cat 5323 steve 31w FIFO 0,10 0t0 22505 pipe
cat 5323 steve 35r FIFO 0,10 0t0 17257 pipe
cat 5323 steve 47r FIFO 0,10 0t0 31304 pipe
cat 5323 steve 49r FIFO 0,10 0t0 30264 pipe
which raises the question, where are all those pipes coming from? Your main loop is no longer a single loop, it is a set of unsynchronized parallel loops. Look at the boilerplate below:
void *tdispatch(void *p) {
int to[2], from[2];
pipe(to);
pipe(from);
if (fork() == 0) {
...
} else {
...
pthread_exit(0);
}
}
...
for (int i = 0; i < NCPU; i++) {
pthread_create(..., tdispatch, ...);
}
for (int i = 0; i < NCPU; i++) {
pthread_join(...);
}
Multiple instances of tdispatch can interleave the pipe(to), pipe(from), and fork() calls; thus fds are leaking into these fork'ed processes. I say leaking because the forked'd process has no idea that they are there.
A pipe continues to respond to read() system calls while it either has buffered data or there is at least one write file descriptor open to it.
Suppose process 5 has its normal two ends of two pipes open, pointing to pipe#10, and pipe#11; and process 6 has pipe#12 and pipe#13. But, owing to the leaking above, processes 5 also has the write end of pipe#12, and process 6 has the write end of pipe#10. Process's 5 and 6 will never exit because they are keeping each others read-pipe's open.
The solution is pretty much what people in the earlier bit were saying: threads and forks are a tricky combination. You would have to serialize your pipe,fork,initial-close bits in order to make it work.