I am supposed to measure the latency and bandwidth between two processes. To do this, I wrote a simple program in C using pipe and fork. To see that everything works, I wanted the parent process and the child process to work alternately. So I wanted the parent process to stop after writing and continue after the child process read and printed the string with 'a'.
I used kill() and pause() along with sleep. My problem is that from time to time the program terminates itself and in other cases it works fine because it may not hit the race condition, I suppose.
Up to now I have the following code:
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <signal.h>
#define DATASIZE 5
int count = 0;
double diff = 0;
void wakeup_handler(int sig)
{
// Do nothing
}
int main(int argc, char *argv[])
{
int pipefd[2];
pid_t pid;
int i, n;
double total_time = 0;
struct timeval t1, t2;
if (pipe(pipefd) == -1)
{
perror("pipe");
exit(EXIT_FAILURE);
}
pid = fork();
if (pid == -1)
{
perror("fork");
exit(EXIT_FAILURE);
}
// Child Process
if (pid == 0)
{
close(pipefd[0]);
char tmp1[DATASIZE];
// Set tmp to 'a'
for (i = 0; i < DATASIZE; i++)
{
tmp1[i] = 'a';
}
gettimeofday(&t1, NULL);
signal(SIGUSR1, wakeup_handler);
for (int i = 0; i < 1000; i++) {
write(pipefd[1], tmp1, sizeof(tmp1));
pause();
count++;
printf("CountC: %d\n", count);
}
gettimeofday(&t2, NULL);
diff = (t2.tv_sec - t1.tv_sec); // sec to ms
diff += (t2.tv_usec - t1.tv_usec) / 1e6; // us to sec
//Correction of 10 seconds because of the 10 seconds sleep in the parent process
// diff+= diff - 10;
double band = (DATASIZE * 1000) / diff;
printf("Average latency: %lf seconds\n", diff / 1000);
printf("Average bandwidth: %lf Mbps\n", band / 1e6);
close(pipefd[1]);
}
// Parent Process
else
{
close(pipefd[1]);
char tmp2[DATASIZE];
for (int i = 0; i < 1000; i++) {
// printf("Parent: Iteration %d...\n", i+1);
read(pipefd[0], tmp2, sizeof(tmp2));
printf("%s\n", tmp2);
sleep(0.01);
kill(pid, SIGUSR1);
}
close(pipefd[0]);
}
return 0;
}
To avoid the race condition where a signal may be delivered before pause
has executed, instead of "buying more time" by calling sleep
in the parent process, sigprocmask
may be used to block the delivery of signals in the child process, delaying their arrival until they are later unblocked. A delayed signal is said to be pending.
sigsuspend
can be used to temporarily unblock signals (by changing the signal mask) while it waits for a signal to arrive.
To avoid the race condition where the parent process sends the data-request signal before the child process has finished establishing its signal handling, the pipe can be used to transmit some initial data. By executing a blocking read
in the parent process, the child process can indicate it is ready with an initial write
that unblocks the parent.
When the work is complete, the parent process should wait
for the child process to terminate.
For significantly improved portability, prefer sigaction
to signal
.
Here is a cursory refactoring of your example. However, for the sake of brevity, it has no error handling.
Do note the blocking nature of write
and read
help create a lockstep here, ensuring the parity of iterations between processes. Otherwise, calling kill
N times does not guarantee N writes to the pipe, as a to-be-delivered signal would be dropped if it is already pending. An important part of what makes this synchronization work is that the parent process immediately waits for the data it requests.
#include <signal.h>
#include <stdio.h>
#include <stdlib.h>
#include <sys/time.h>
#include <sys/wait.h>
#include <unistd.h>
struct packet {
char buffer[32];
};
void wakeup_handler(int sig)
{
/* Do nothing */
(void) sig;
}
int main(void)
{
/* each process receives a copy of the number of tests to run */
unsigned cycles = 1000;
int pipefd[2];
pipe(pipefd);
pid_t pid = fork();
if (pid == 0) {
/* Child process */
close(pipefd[0]);
struct sigaction sa = { .sa_handler = wakeup_handler };
sigaction(SIGUSR1, &sa, NULL);
sigset_t mask, original;
sigemptyset(&mask);
sigaddset(&mask, SIGUSR1);
/* block (delay) delivery of the SIGUSR1 signal */
sigprocmask(SIG_BLOCK, &mask, &original);
/* write some initial data
* letting the parent know the child has finished
* setting up its signal handling
*/
int ready = 1;
write(pipefd[1], &ready, sizeof ready);
struct timeval t1, t2;
int count = 0;
double diff = 0;
gettimeofday(&t1, NULL);
while (cycles--) {
struct packet output = { "aaaaa" };
/* temporary restore the original signal mask
* and wait for a signal to arrive
* (including previously delayed signals)
*/
sigsuspend(&original);
write(pipefd[1], &output, sizeof output);
printf("CountC: %d\n", ++count);
}
gettimeofday(&t2, NULL);
diff = (t2.tv_sec - t1.tv_sec); // sec to ms
diff += (t2.tv_usec - t1.tv_usec) / 1e6; // us to sec
double band = (sizeof (struct packet) * 1000) / diff;
printf("Average latency: %lf seconds\n", diff / 1000);
printf("Average bandwidth: %lf Mbps\n", band / 1e6);
close(pipefd[1]);
/* ensure child does not run off */
exit(0);
}
/* Parent process */
close(pipefd[1]);
/* block execution in the parent process until
* some initial data arrives indicating the child process
* is ready to receive signals
*/
int initialized;
read(pipefd[0], &initialized, sizeof initialized);
while (cycles--) {
struct packet input;
kill(pid, SIGUSR1);
read(pipefd[0], &input, sizeof input);
printf("%.*s\n", (int) sizeof input.buffer, input.buffer);
}
close(pipefd[0]);
/* wait for the child process to terminate */
wait(NULL);
}