Search code examples
cipc

Why do shared memory segments run longer than pipe when transferring big data?


I am writing a lab on operating systems. You need to write two programs that transmit data using pipe and shared memory segments. It is necessary to compare the transfer time and come to the conclusion that the shared memory segments work faster with big data. But it turns out the opposite. What is the problem in my code? What am I doing wrong? I calculate the working time as the end of reading minus the beginning of writing

UPD: I listened to the comments and changed the code, add new result

CODE PIPE:

#define _GNU_SOURCE

#include <sys/time.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <stdio.h>
#include <wait.h>
#include <math.h>
#include <stdlib.h>
#include <fcntl.h>

#define COUNT 6

int main()
{
    // fd[0]=READ, fd[1]=WRITE
    int fd[2], nread, pid, status, base = 1;
    for (int i = 0; i < COUNT; i++)
    {
        int size = 8*base; // bytes
        base *= 16;
        pipe(fd);
        fcntl(fd[1], F_SETPIPE_SZ, size);
        fcntl(fd[0], F_SETPIPE_SZ, size);
        struct timespec t1, t2;
        pid = fork();
        
        if (pid == 0)
        {
            // READ
            int tmp_size = 0;           
            close(fd[1]);               
            char *buf = (char *)malloc((size) * sizeof(char));
            while((nread = read(fd[0], buf, size)) != 0);

            free(buf);
            exit(1);    
        }
        else
        {
            // WRITE
            close(fd[0]);   
            clock_gettime(CLOCK_REALTIME, &t1);
        
            char *buf = (char *)malloc((size) * sizeof(char));
            for (int j = 0; j < size; j++)
            {
                *(buf+j) = '1';
            }
            write(fd[1], buf, size);
            
            close(fd[1]);
            free(buf);
        }
        wait(&status);
        clock_gettime(CLOCK_REALTIME, &t2);
        FILE *file = i == 0? fopen("program1_result", "w"): fopen("program1_result", "a+");
        char cur_time[150];
        sprintf(cur_time, "%ld nsec\n", 1000000000*(t2.tv_sec - t1.tv_sec) + (t2.tv_nsec-t1.tv_nsec));
        if (file != NULL) fputs(cur_time, file);
        fclose(file);

    }
}

CODE SHARED MEMORY:

#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/sem.h>
#include <math.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <wait.h>

#define COUNT 6

int main()
{
    int pid, status, shmid, sem, base = 1;
    char *shmPtr;
    key_t shm_keys[COUNT], sem_keys[COUNT];
    for (int i = 0; i < COUNT; i++)
    {
        shm_keys[i] = ftok(".", (char)i);
        sem_keys[i] = ftok(".", 'a' + i);
    }
    
    for (int i = 0; i < COUNT; i++)
    {
        int size = 8*base;      
        base *= 16;
        if ((shmid = shmget(shm_keys[i], size + 1, IPC_CREAT | 0666)) == -1) exit(1);
        shmPtr = shmat(shmid, 0, 0);
        
        sem = semget(sem_keys[i], 1, IPC_CREAT | 0666);
        //struct timeval s, e;
        struct timespec t1, t2;
        pid = fork();

        if (pid == 0)
        {
            while(semctl(sem, 0, GETVAL) != 1);
            char *str = (char *)malloc(size * sizeof(char));
            strcpy(str, shmPtr);
            
            free(str);
            exit(1);
        }
        else
        {           
            clock_gettime(CLOCK_REALTIME, &t1);
            char *str = (char *)malloc((size+1) * sizeof(char));
            for (int j = 0; j < size; j++)
            {
                *(str+j) = '1';
            }
            *(str + size) = '\0';
            strcpy(shmPtr, str);
            
            semctl(sem, 0, SETVAL, 1);
            free(str);          
        }   
        wait(&status);
        clock_gettime(CLOCK_REALTIME, &t2);
        FILE *file = i == 0? fopen("program2_result", "w"): fopen("program2_result", "a+");
        char cur_time[150];
        sprintf(cur_time, "%ld nsec\n", 1000000000*(t2.tv_sec - t1.tv_sec) + (t2.tv_nsec-t1.tv_nsec));
        if (file != NULL) fputs(cur_time, file);
        fclose(file);
        
        shmdt(shmPtr);
        shmctl(shmid, IPC_RMID, NULL);
        semctl(sem, 0, IPC_RMID);
    }
    
    return 0;
}   

RESULT PIPE:

  • 510134 nsec (8 bytes)
  • 751695 nsec (128 bytes)
  • 317859 nsec (2048 bytes)
  • 372219 nsec (32768 bytes)
  • 2158510 nsec (524288 bytes)
  • 19722241 nsec (8388608 bytes)

RESULT SHARED MEMORY:

  • 463168 nsec (8 bytes)
  • 369917 nsec (128 bytes)
  • 307799 nsec (2048 bytes)
  • 361162 nsec (32768 bytes)
  • 2019749 nsec (524288 bytes)
  • 24943196 nsec (8388608 bytes)
New result:
|bytes    | pipe (nsec) | shm (nsec)|
|---------|-------------|-----------|
| 8       |   517516    |  489703   |
| 128     |   205485    |  440699   |
| 2048    |   374170    |  1162227  |
| 32768   |   584830    |  548490   |
| 524288  |   3162177   |  4010005  |
| 8388608 |   50293808  |  67142116 |

Please help understand what i do wrong.


Solution

  • What is the problem in my code? What am I doing wrong?

    Shared memory, particularly for large amounts of data is almost always faster.

    The two programs are not equivalent in what they measure and this lead to erroneous results.

    When corrected, the results clearly show that shared memory is faster (2x-3x faster). And, in terms of latency of the receiver, with shared memory, the receiver can start processing data much faster.

    Consider the logical steps until the receiver can process data:

    1. Sender creates the data in the buffer
    2. For a pipe, sender does write which is a copying of the data. The receiver does a read which is [another] copying of the data.
    3. For shared memory, no copy into the kernel or out of it is needed. Shared memory is "zero copy" as far as the kernel actions are concerned.

    In other words, the pipe version does two [unnecessary] copy operations that the shm version doesn't.

    Also, consider the "latency". The time from when the sender has filled the buffer until the receiver is [capable of] processing the data. The pipe version still has to write the data to the pipe. But, with the shm version, the receiver can start immediately after receiving the "notify" message (more on that in the update below). Issues in the code ...

    1. We want to measure just the transfer time from process to process.
    2. In the shared memory version, the strcpy is extraneous. It defeats the purpose of using shared memory.
    3. We need to operate directly on the shared buffer.
    4. The strcpy is doing extra work that the pipe version isn't doing. That is, the pipe version is only doing read -- no processing of the data.
    5. To mitigate the effects of timeslicing and system loading, we should do the same test multiple times and take the minimum time.
    6. Your timing is a bit off.
    7. Because we're doing fork, we can use IPC_PRIVATE instead of ftok (a bit cleaner).

    Here is a refactored version. It is annotated with bugs/fixes.

    1. I've improved the benchmark code.
    2. I've added internal benchmark to show startup and latency.
    3. I've combined both programs into one .c file.
    4. Your fopen calls could be cleaned up.
    #define _GNU_SOURCE
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <errno.h>
    #include <time.h>
    
    #include <sys/time.h>
    #include <wait.h>
    #include <math.h>
    #include <fcntl.h>
    
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/sem.h>
    
    #define sysfault(_fmt...) \
        do { \
            fprintf(stderr,_fmt); \
            exit(1); \
        } while (0)
    
    #define Tmark       (tscgetf() - tscbeg)
    
    double tsczero;
    
    double
    _tscgetf(void)
    {
        struct timespec ts;
        double sec;
    
        clock_gettime(CLOCK_MONOTONIC,&ts);
    
        sec = ts.tv_nsec;
        sec /= 1e9;
        sec += ts.tv_sec;
    
        sec -= tsczero;
    
        return sec;
    }
    
    #define tscgetf() \
        ({ \
            __asm__ __volatile__("nop\n" ::: "memory"); \
            _tscgetf(); \
        })
    
    int base;
    size_t size;
    double t1;
    double t2;
    int shmid, sem;
    char *shmPtr;
    double tscbeg;
    double tscsend[3];
    double tscbest[3];
    double tscrcv[3];
    
    struct hdr {
        const char *sym;
        const char *reason;
    };
    
    #define HDRMAX  (sizeof(hdrs) / sizeof(hdrs[0]))
    
    #define _HDRPUT(_fmt,_data...) \
        do { \
            totlen = fprintf(file,_fmt,_data); \
            for (;  totlen < lim;  ++totlen) \
                fputc(' ',file); \
        } while (0)
    #define HDRPUT(_fmt,_data...) \
        do { \
            _HDRPUT(_fmt,_data); \
            ++ihdr; \
        } while (0)
    
    void
    result(int i,const char *pre,double tbest)
    {
        int totlen;
        int lim = 12;
    
        char name[100];
        sprintf(name,"%s_result",pre);
    
        if (i == 0)
            unlink(name);
    
        FILE *file = fopen(name,"a");
    
        static struct hdr hdrs[] = {
            { "ELAPSED", "total elapsed time" },
            { "Tsend", "sender unlocks receiver" },
            { "Twait", "receiver becomes ready for input" },
            { "Trcv", "receiver released by sender" },
            { "Latency", "Trcv - Twait (time receiver is delayed)" },
            { "size", "buffer size" },
        };
    
        int ihdr;
        if (i == 0) {
            fprintf(file,"Legend:\n");
            for (ihdr = 0;  ihdr < HDRMAX;  ++ihdr) {
                _HDRPUT("%s",hdrs[ihdr].sym);
                fprintf(file,"-- %s\n",hdrs[ihdr].reason);
            }
    
            fprintf(file,"\n");
            for (ihdr = 0;  ihdr < HDRMAX;  ++ihdr)
                _HDRPUT("%s",hdrs[ihdr].sym);
            fprintf(file,"\n");
        }
    
        ihdr = 0;
        HDRPUT("%.9f",tbest);
        HDRPUT("%.9f",tscsend[0]);
        HDRPUT("%.9f",tscrcv[0]);
    
        HDRPUT("%.9f",tscrcv[1]);
        HDRPUT("%.9f",tscrcv[1] - tscrcv[0]);
    #if 0
        HDRPUT("%d/%d",base,size);
    #else
        fprintf(file,"%zu",size);
    #endif
        fprintf(file,"\n");
    
        fclose(file);
    }
    
    #define COUNT 16
    
    void
    dobest(const char *pre,void (*fnc)(int i,int dtflg))
    {
        // fd[0]=READ, fd[1]=WRITE
    
        base = 1;
        for (int i = 0; i < COUNT; i++) {
            double tbest = 1e9;
    
            size = base;
            if ((shmid = shmget(IPC_PRIVATE, size + 1, IPC_CREAT | 0666)) == -1)
                sysfault("dobest: shmget failure -- %s\n",strerror(errno));
            shmPtr = shmat(shmid, 0, 0);
            sem = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
    
            // do multiple trials to minimize the effects of timeslicing and system
            // loading
            for (int iter = 3;  iter > 0;  --iter) {
                tscbeg = tscgetf();
                fnc(i,iter == 1);
                double tdif = t2 - t1;
                if (tdif < tbest) {
                    tbest = tdif;
                    memcpy(tscrcv,shmPtr,sizeof(tscrcv));
                    memcpy(tscbest,tscsend,sizeof(tscbest));
                }
            }
    
            // record the best result
            result(i,pre,tbest);
    
            shmdt(shmPtr);
            shmctl(shmid, IPC_RMID, NULL);
            semctl(sem, 0, IPC_RMID);
    
            base *= 4;
        }
    }
    
    void
    pipeone(int i,int dtflg)
    {
        int fd[2];
        int nread, pid, status;
    
        pipe(fd);
        fcntl(fd[1], F_SETPIPE_SZ, size);
        fcntl(fd[0], F_SETPIPE_SZ, size);
    
        char *buf = malloc(size);
        pid = fork();
    
        if (pid == 0) {
            // READ
    
            close(fd[1]);
    
            tscrcv[0] = Tmark;
            int mark = 1;
            while ((nread = read(fd[0], buf, size)) != 0) {
                if (mark)
                    tscrcv[1] = Tmark;
                mark = 0;
            }
    
            memcpy(shmPtr,tscrcv,sizeof(tscrcv));
    
            exit(0);
        }
    
        // WRITE
        close(fd[0]);
    
        t1 = tscgetf();
    
        for (size_t j = 0; j < size; j++)
            buf[j] = '1';
    
        tscsend[0] = Tmark;
        write(fd[1], buf, size);
        close(fd[1]);
    
        wait(&status);
    
        t2 = tscgetf();
    
        free(buf);
    }
    
    key_t shm_keys[COUNT], sem_keys[COUNT];
    
    void
    shmone(int i,int dtflg)
    {
        int pid, status;
    
        pid = fork();
    
        if (pid == 0) {
            tscrcv[0] = Tmark;
            while (semctl(sem, 0, GETVAL) != 1);
            tscrcv[1] = Tmark;
    
    // NOTE/BUG: not valid to copy from the shared buffer -- the whole point is to
    // operate on the shared buffer directly!!!
    #if 0
            char *str = (char *) malloc(size);
            strcpy(str, shmPtr);
            free(str);
            exit(1);
    
    // NOTE/FIX: do _not_ copy out the buffer -- for testing purposes, we only want
    // to measure the delivery time
    #else
            size_t len = 0;
    #if 0
            len = strlen(shmPtr);
    #endif
            memcpy(shmPtr,tscrcv,sizeof(tscrcv));
            exit(len % 0x7f);
    #endif
        }
    
        t1 = tscgetf();
    #if 0
        char *str = (char *) malloc((size + 1) * sizeof(char));
    #else
        char *str = shmPtr;
    #endif
    
        for (size_t j = 0; j < size; j++)
            str[j] = '1';
        str[size] = '\0';
    
    #if 0
        strcpy(shmPtr, str);
    #endif
    
        tscsend[0] = Tmark;
        semctl(sem, 0, SETVAL, 1);
    #if 0
        free(str);
    #endif
    
        wait(&status);
    
        t2 = tscgetf();
    }
    
    void
    pipeall(void)
    {
        // fd[0]=READ, fd[1]=WRITE
    
        dobest("pipe",pipeone);
    }
    
    void
    shmall(void)
    {
        // fd[0]=READ, fd[1]=WRITE
    
        for (int i = 0; i < COUNT; i++) {
            shm_keys[i] = ftok(".", (char) i);
            sem_keys[i] = ftok(".", 'a' + i);
        }
    
        dobest("shm",shmone);
    }
    
    int
    main(int argc,char **argv)
    {
    
        --argc;
        ++argv;
    
        tsczero = tscgetf();
    
        if (argc < 1)
            sysfault("main: specify type: pipe or shm\n");
    
        do {
            char *cp = *argv;
    
            if (strcmp(cp,"pipe") == 0) {
                pipeall();
                break;
            }
    
            if (strcmp(cp,"shm") == 0) {
                shmall();
                break;
            }
    
            sysfault("main: unknown type -- '%s'\n",cp);
        } while (0);
    
        return 0;
    }
    

    In the code above, I've used cpp conditionals to denote old vs. new code:

    #if 0
    // old code
    #else
    // new code
    #endif
    
    #if 1
    // new code
    #endif
    

    Note: this can be cleaned up by running the file through unifdef -k


    Here is the output of ./program pipe:

    Legend:
    ELAPSED     -- total elapsed time
    Tsend       -- sender unlocks receiver
    Twait       -- receiver becomes ready for input
    Trcv        -- receiver released by sender
    Latency     -- Trcv - Twait (time receiver is delayed)
    size        -- buffer size
    
    ELAPSED     Tsend       Twait       Trcv        Latency     size
    0.000212793 0.000097429 0.000125559 0.000162328 0.000036769 1
    0.000167312 0.000104965 0.000125442 0.000147394 0.000021952 4
    0.000177647 0.000096102 0.000093215 0.000128481 0.000035266 16
    0.000106641 0.000080761 0.000077737 0.000096395 0.000018658 64
    0.000204464 0.000163753 0.000112410 0.000148289 0.000035879 256
    0.000147881 0.000060119 0.000144019 0.000166102 0.000022083 1024
    0.000108481 0.000072081 0.000080240 0.000099838 0.000019598 4096
    0.000156202 0.000130236 0.000145147 0.000173962 0.000028815 16384
    0.000233014 0.000144926 0.000076950 0.000188236 0.000111286 65536
    0.000605337 0.000425224 0.000079762 0.000537632 0.000457870 262144
    0.002080624 0.001375300 0.000150789 0.001993695 0.001842906 1048576
    0.009131487 0.005655973 0.000243503 0.008914248 0.008670745 4194304
    0.036384772 0.021980127 0.000213079 0.035238290 0.035025211 16777216
    0.142985735 0.072771127 0.000424021 0.137922121 0.137498100 67108864
    0.572007428 0.292893437 0.000403933 0.553239097 0.552835164 268435456
    2.276978512 1.176618786 0.000574158 2.220540762 2.219966604 1073741824
    

    Here is the output of ./program shm:

    Legend:
    ELAPSED     -- total elapsed time
    Tsend       -- sender unlocks receiver
    Twait       -- receiver becomes ready for input
    Trcv        -- receiver released by sender
    Latency     -- Trcv - Twait (time receiver is delayed)
    size        -- buffer size
    
    ELAPSED     Tsend       Twait       Trcv        Latency     size
    0.000181048 0.000081319 0.000126865 0.000152599 0.000025734 1
    0.000277881 0.000079515 0.000165932 0.000177756 0.000011824 4
    0.000191934 0.000081238 0.000126623 0.000134039 0.000007416 16
    0.000201862 0.000080135 0.000124149 0.000135244 0.000011095 64
    0.000183210 0.000080799 0.000107105 0.000113932 0.000006827 256
    0.000146235 0.000085935 0.000112573 0.000119606 0.000007033 1024
    0.000144176 0.000085737 0.000000000 0.000000000 0.000000000 4096
    0.000235418 0.000100743 0.000136712 0.000148129 0.000011417 16384
    0.000172821 0.000160697 0.000123424 0.000134592 0.000011168 65536
    0.000201004 0.000241614 0.000122755 0.000129961 0.000007206 262144
    0.000748101 0.000788737 0.000091531 0.000098554 0.000007023 1048576
    0.002963725 0.003005338 0.000150246 0.000157523 0.000007277 4194304
    0.011919764 0.011992419 0.000192575 0.000201370 0.000008795 16777216
    0.047795073 0.047874544 0.000200046 0.000209144 0.000009098 67108864
    0.190522428 0.190601263 0.000178723 0.000187669 0.000008946 268435456
    0.757310083 0.757391226 0.000177408 0.000186831 0.000009423 1073741824
    

    Here is the cleaned up source:

    #define _GNU_SOURCE
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <errno.h>
    #include <time.h>
    
    #include <sys/time.h>
    #include <wait.h>
    #include <math.h>
    #include <fcntl.h>
    
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/sem.h>
    
    #define sysfault(_fmt...) \
        do { \
            fprintf(stderr,_fmt); \
            exit(1); \
        } while (0)
    
    #define Tmark       (tscgetf() - tscbeg)
    
    double tsczero;
    
    double
    _tscgetf(void)
    {
        struct timespec ts;
        double sec;
    
        clock_gettime(CLOCK_MONOTONIC,&ts);
    
        sec = ts.tv_nsec;
        sec /= 1e9;
        sec += ts.tv_sec;
    
        sec -= tsczero;
    
        return sec;
    }
    
    #define tscgetf() \
        ({ \
            __asm__ __volatile__("nop\n" ::: "memory"); \
            _tscgetf(); \
        })
    
    int base;
    size_t size;
    double t1;
    double t2;
    int shmid, sem;
    char *shmPtr;
    double tscbeg;
    double tscsend[3];
    double tscbest[3];
    double tscrcv[3];
    
    struct hdr {
        const char *sym;
        const char *reason;
    };
    
    #define HDRMAX  (sizeof(hdrs) / sizeof(hdrs[0]))
    
    #define _HDRPUT(_fmt,_data...) \
        do { \
            totlen = fprintf(file,_fmt,_data); \
            for (;  totlen < lim;  ++totlen) \
                fputc(' ',file); \
        } while (0)
    #define HDRPUT(_fmt,_data...) \
        do { \
            _HDRPUT(_fmt,_data); \
            ++ihdr; \
        } while (0)
    
    void
    result(int i,const char *pre,double tbest)
    {
        int totlen;
        int lim = 12;
    
        char name[100];
        sprintf(name,"%s_result",pre);
    
        if (i == 0)
            unlink(name);
    
        FILE *file = fopen(name,"a");
    
        static struct hdr hdrs[] = {
            { "ELAPSED", "total elapsed time" },
            { "Tsend", "sender unlocks receiver" },
            { "Twait", "receiver becomes ready for input" },
            { "Trcv", "receiver released by sender" },
            { "Latency", "Trcv - Twait (time receiver is delayed)" },
            { "size", "buffer size" },
        };
    
        int ihdr;
        if (i == 0) {
            fprintf(file,"Legend:\n");
            for (ihdr = 0;  ihdr < HDRMAX;  ++ihdr) {
                _HDRPUT("%s",hdrs[ihdr].sym);
                fprintf(file,"-- %s\n",hdrs[ihdr].reason);
            }
    
            fprintf(file,"\n");
            for (ihdr = 0;  ihdr < HDRMAX;  ++ihdr)
                _HDRPUT("%s",hdrs[ihdr].sym);
            fprintf(file,"\n");
        }
    
        ihdr = 0;
        HDRPUT("%.9f",tbest);
        HDRPUT("%.9f",tscsend[0]);
        HDRPUT("%.9f",tscrcv[0]);
    
        HDRPUT("%.9f",tscrcv[1]);
        HDRPUT("%.9f",tscrcv[1] - tscrcv[0]);
        fprintf(file,"%zu",size);
        fprintf(file,"\n");
    
        fclose(file);
    }
    
    #define COUNT 16
    
    void
    dobest(const char *pre,void (*fnc)(int i,int dtflg))
    {
        // fd[0]=READ, fd[1]=WRITE
    
        base = 1;
        for (int i = 0; i < COUNT; i++) {
            double tbest = 1e9;
    
            size = base;
            if ((shmid = shmget(IPC_PRIVATE, size + 1, IPC_CREAT | 0666)) == -1)
                sysfault("dobest: shmget failure -- %s\n",strerror(errno));
            shmPtr = shmat(shmid, 0, 0);
            sem = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
    
            // do multiple trials to minimize the effects of timeslicing and system
            // loading
            for (int iter = 3;  iter > 0;  --iter) {
                tscbeg = tscgetf();
                fnc(i,iter == 1);
                double tdif = t2 - t1;
                if (tdif < tbest) {
                    tbest = tdif;
                    memcpy(tscrcv,shmPtr,sizeof(tscrcv));
                    memcpy(tscbest,tscsend,sizeof(tscbest));
                }
            }
    
            // record the best result
            result(i,pre,tbest);
    
            shmdt(shmPtr);
            shmctl(shmid, IPC_RMID, NULL);
            semctl(sem, 0, IPC_RMID);
    
            base *= 4;
        }
    }
    
    void
    pipeone(int i,int dtflg)
    {
        int fd[2];
        int nread, pid, status;
    
        pipe(fd);
        fcntl(fd[1], F_SETPIPE_SZ, size);
        fcntl(fd[0], F_SETPIPE_SZ, size);
    
        char *buf = malloc(size);
        pid = fork();
    
        if (pid == 0) {
            // READ
    
            close(fd[1]);
    
            tscrcv[0] = Tmark;
            int mark = 1;
            while ((nread = read(fd[0], buf, size)) != 0) {
                if (mark)
                    tscrcv[1] = Tmark;
                mark = 0;
            }
    
            memcpy(shmPtr,tscrcv,sizeof(tscrcv));
    
            exit(0);
        }
    
        // WRITE
        close(fd[0]);
    
        t1 = tscgetf();
    
        for (size_t j = 0; j < size; j++)
            buf[j] = '1';
    
        tscsend[0] = Tmark;
        write(fd[1], buf, size);
        close(fd[1]);
    
        wait(&status);
    
        t2 = tscgetf();
    
        free(buf);
    }
    
    key_t shm_keys[COUNT], sem_keys[COUNT];
    
    void
    shmone(int i,int dtflg)
    {
        int pid, status;
    
        pid = fork();
    
        if (pid == 0) {
            tscrcv[0] = Tmark;
            while (semctl(sem, 0, GETVAL) != 1);
            tscrcv[1] = Tmark;
    
    // NOTE/BUG: not valid to copy from the shared buffer -- the whole point is to
    // operate on the shared buffer directly!!!
            size_t len = 0;
            memcpy(shmPtr,tscrcv,sizeof(tscrcv));
            exit(len % 0x7f);
        }
    
        t1 = tscgetf();
        char *str = shmPtr;
    
        for (size_t j = 0; j < size; j++)
            str[j] = '1';
        str[size] = '\0';
    
        tscsend[0] = Tmark;
        semctl(sem, 0, SETVAL, 1);
    
        wait(&status);
    
        t2 = tscgetf();
    }
    
    void
    pipeall(void)
    {
        // fd[0]=READ, fd[1]=WRITE
    
        dobest("pipe",pipeone);
    }
    
    void
    shmall(void)
    {
        // fd[0]=READ, fd[1]=WRITE
    
        for (int i = 0; i < COUNT; i++) {
            shm_keys[i] = ftok(".", (char) i);
            sem_keys[i] = ftok(".", 'a' + i);
        }
    
        dobest("shm",shmone);
    }
    
    int
    main(int argc,char **argv)
    {
    
        --argc;
        ++argv;
    
        tsczero = tscgetf();
    
        if (argc < 1)
            sysfault("main: specify type: pipe or shm\n");
    
        do {
            char *cp = *argv;
    
            if (strcmp(cp,"pipe") == 0) {
                pipeall();
                break;
            }
    
            if (strcmp(cp,"shm") == 0) {
                shmall();
                break;
            }
    
            sysfault("main: unknown type -- '%s'\n",cp);
        } while (0);
    
        return 0;
    }
    

    UPDATE:

    I've done some considerable cleanup on the test program. Due to some objections, I've added some options:

    -C<n> -- test count
    -I<n> -- iteration count
    -m<0/1> -- notification (1=msgsnd/msgrcv, 0=sem*)
    -S<0/1> -- 1=pipe set maximum size (F_SETPIPE_SZ)
    -x<0/1> -- 1=child scan/process data
    -v<0/1> -- 1=copy results to stdout
    

    It turns out that using sem* for notification [as posted here] is not as reliable as msgsnd/msgrcv [which is what I traditionally use]. Using sem* can produce a negative latency value for some tests. This indicates that the mechanism isn't robust [for some reason]. So, I replaced it with msgsnd/msgrcv

    I added the -x option for force the receiver to "do work" on the buffer. This had a slight effect. As I mentioned, we want to measure the transfer time (and not the transfer time + processing time), so I'd leave off -x, but it's there for completeness.

    Also, if no "test" argument is given, the program will run the "pipe" test and then the "shm" test sequentially (in subprocesses) as a convenience.

    More details about the options are in the code:

    #define _GNU_SOURCE
    
    #include <stdio.h>
    #include <stdlib.h>
    #include <unistd.h>
    #include <string.h>
    #include <errno.h>
    #include <time.h>
    
    #include <sys/time.h>
    #include <wait.h>
    #include <math.h>
    #include <fcntl.h>
    
    #include <sys/types.h>
    #include <sys/ipc.h>
    #include <sys/shm.h>
    #include <sys/sem.h>
    #include <sys/msg.h>
    
    int opt_C = 16;
    int opt_I = 3;
    int opt_m = 1;
    int opt_S = 1;
    int opt_x;
    int opt_v;
    
    #define sysfault(_fmt...) \
        do { \
            fprintf(stderr,_fmt); \
            exit(1); \
        } while (0)
    
    #define Tmark       (tscgetf() - tscbeg)
    
    double tsczero;
    
    double
    _tscgetf(void)
    {
        struct timespec ts;
        double sec;
    
        clock_gettime(CLOCK_MONOTONIC,&ts);
    
        sec = ts.tv_nsec;
        sec /= 1e9;
        sec += ts.tv_sec;
    
        sec -= tsczero;
    
        return sec;
    }
    
    #define barrier \
        __asm__ __volatile__("nop\n" ::: "memory"); \
    
    #define tscgetf() \
        ({ \
            barrier; \
            _tscgetf(); \
        })
    
    int base;
    size_t size;
    double t1;
    double t2;
    int shmid;
    int sem;
    char *shmPtr;
    char *fake;
    int forkflg;
    double tscbeg;
    
    double tscsend[3];
    double tscbest[3];
    double tscrcv[3];
    
    struct msg {
        long msg_type;
        int msg_payload;
    };
    #define MSG_TYPE        17
    
    enum {
        Tnotify
    };
    
    enum {
        Tready,
        Trunning,
    };
    
    char result_file[100];
    
    struct hdr {
        const char *sym;
        const char *reason;
    };
    
    #define HDRMAX  (sizeof(hdrs) / sizeof(hdrs[0]))
    
    #define _HDRPUT(_fmt,_data...) \
        do { \
            totlen = fprintf(xf,_fmt,_data); \
            for (;  totlen < lim;  ++totlen) \
                fputc(' ',xf); \
        } while (0)
    #define HDRPUT(_fmt,_data...) \
        do { \
            _HDRPUT(_fmt,_data); \
            ++ihdr; \
        } while (0)
    
    void
    optshow(FILE *xf)
    {
        fprintf(xf,"options:\n");
        fprintf(xf,"  C=%d -- test count\n",opt_C);
        fprintf(xf,"  I=%d -- iteration count\n",opt_I);
        fprintf(xf,"  m=%d -- notification (1=msgsnd/msgrcv, 0=sem*)\n",opt_m);
        fprintf(xf,"  S=%d -- 1=pipe set maximum size (F_SETPIPE_SZ)\n",opt_S);
        fprintf(xf,"  x=%d -- 1=child scan/process data\n",opt_x);
    }
    
    void
    result(int i,const char *pre,double tbest)
    {
        int totlen;
        int lim = opt_m ? 12 : 13;
    
        if (i == 0)
            unlink(result_file);
    
        FILE *xf = fopen(result_file,"a");
    
        static struct hdr hdrs[] = {
            { "ELAPSED", "total elapsed time" },
            { "Tnotify", "sender buffer filled -- receiver notified" },
            { "Tready", "receiver becomes ready for input" },
            { "Trunning", "receiver released by sender" },
            { "Latency", "Trunning - Tnotify (time receiver is delayed)" },
            { "size", "buffer size" },
        };
    
        int ihdr;
        if (i == 0) {
            fprintf(xf,"TEST: %s ...\n",pre);
            optshow(xf);
            fprintf(xf,"\n");
    
            fprintf(xf,"Legend:\n");
            for (ihdr = 0;  ihdr < HDRMAX;  ++ihdr) {
                _HDRPUT("%s",hdrs[ihdr].sym);
                fprintf(xf,"-- %s\n",hdrs[ihdr].reason);
            }
    
            fprintf(xf,"\n");
            for (ihdr = 0;  ihdr < HDRMAX;  ++ihdr)
                _HDRPUT("%s",hdrs[ihdr].sym);
            fprintf(xf,"\n");
        }
    
        ihdr = 0;
        HDRPUT("%.9f",tbest);
        HDRPUT("%.9f",tscbest[Tnotify]);
        HDRPUT("%.9f",tscrcv[Tready]);
    
        HDRPUT("%.9f",tscrcv[Trunning]);
        HDRPUT("%.9f",tscrcv[Trunning] - tscbest[Tnotify]);
        fprintf(xf,"%zu",size);
        fprintf(xf,"\n");
    
        fclose(xf);
    }
    
    void
    result_show(void)
    {
        char *av[3];
        pid_t pid;
    
        av[0] = "cat";
        av[1] = result_file;
        av[2] = NULL;
    
        if (opt_v) {
            if (result_file[0] == 0)
                sysfault("result_show: null result_file\n");
    
            pid = fork();
    
            if (pid == 0) {
                printf("\n");
                execvp(av[0],av);
                sysfault("result_show: exec fault -- %s\n",strerror(errno));
            }
    
            waitpid(pid,NULL,0);
        }
    }
    
    void
    scan(char *buf,size_t len)
    {
    
        if (opt_x)
            fake = memchr(buf,'2',len);
    }
    
    void
    notify_open(void)
    {
    
        if (opt_m)
            sem = msgget(IPC_PRIVATE,IPC_CREAT | 0666);
        else
            sem = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
    }
    
    void
    notify_close(void)
    {
    
        if (opt_m)
            msgctl(sem, 0, IPC_RMID);
        else
            semctl(sem, 0, IPC_RMID);
    }
    
    void
    notify_send(void)
    {
        struct msg msg;
    
        tscsend[Tnotify] = Tmark;
    
        if (opt_m) {
            msg.msg_type = MSG_TYPE;
            msgsnd(sem,&msg,sizeof(msg) - sizeof(msg.msg_type),0);
        }
        else
            semctl(sem, 0, SETVAL, 1);
    }
    
    void
    notify_wait(void)
    {
        struct msg msg;
    
        tscrcv[Tready] = Tmark;
    
        if (opt_m) {
            msgrcv(sem,&msg,sizeof(msg) - sizeof(msg.msg_type),MSG_TYPE,0);
        }
        else {
            while (semctl(sem, 0, GETVAL) != 1);
        }
    
        tscrcv[Trunning] = Tmark;
    }
    
    void
    dobest(const char *pre,void (*fnc)(int i,int dtflg))
    {
        // fd[0]=READ, fd[1]=WRITE
    
        if (! opt_v) {
            printf("\n");
            printf("dobest: %s ...\n",pre);
        }
        fflush(stdout);
    
        sprintf(result_file,"%s_result",pre);
    
        if (forkflg) {
            pid_t pid = fork();
            if (pid != 0) {
                waitpid(pid,NULL,0);
                result_show();
                return;
            }
        }
    
        base = 1;
        for (int i = 0; i < opt_C; i++) {
            double tbest = 1e9;
    
            size = base;
            if ((shmid = shmget(IPC_PRIVATE, size + 1, IPC_CREAT | 0666)) == -1)
                sysfault("dobest: shmget failure -- %s\n",strerror(errno));
            shmPtr = shmat(shmid, 0, 0);
            notify_open();
    
            // do multiple trials to minimize the effects of timeslicing and system
            // loading
            for (int iter = opt_I;  iter > 0;  --iter) {
                tscbeg = tscgetf();
                barrier;
    
                fnc(i,iter == 1);
    
                double tdif = t2 - t1;
                if (tdif < tbest) {
                    tbest = tdif;
                    memcpy(tscrcv,shmPtr,sizeof(tscrcv));
                    memcpy(tscbest,tscsend,sizeof(tscbest));
                }
            }
    
            // record the best result
            result(i,pre,tbest);
    
            shmdt(shmPtr);
            shmctl(shmid, IPC_RMID, NULL);
            notify_close();
    
            base *= 4;
        }
    
        if (forkflg)
            exit(0);
    
        result_show();
    }
    
    void
    pipeone(int i,int dtflg)
    {
        int fd[2];
        ssize_t nread;
        int pid, status;
    
        pipe(fd);
        if (opt_S) {
            fcntl(fd[1], F_SETPIPE_SZ, size);
            fcntl(fd[0], F_SETPIPE_SZ, size);
        }
    
        char *buf = malloc(size);
        pid = fork();
    
        if (pid == 0) {
            // READ
    
            close(fd[1]);
    
            tscrcv[0] = Tmark;
            int mark = 1;
            while (1) {
                nread = read(fd[0], buf, size);
                int sverr = errno;
    
                if (nread == 0)
                    break;
    
                if (mark)
                    tscrcv[1] = Tmark;
                mark = 0;
    
                if (nread < 0) {
                    printf("pipeone: read error -- %s\n",strerror(sverr));
                    break;
                }
    
                scan(buf,nread);
            }
    
            memcpy(shmPtr,tscrcv,sizeof(tscrcv));
    
            exit(0);
        }
    
        // WRITE
        close(fd[0]);
    
        t1 = tscgetf();
    
        for (size_t j = 0; j < size; j++)
            buf[j] = '1';
    
        tscsend[0] = Tmark;
        size_t off = 0;
        ssize_t xlen;
        for (;  off < size;  off += xlen) {
            xlen = write(fd[1], &buf[off], size - off);
            if (xlen < 0) {
                printf("pipeone: write error off=%zu size=%zu -- %s\n",
                    off,size,strerror(errno));
                break;
            }
        }
        close(fd[1]);
    
        waitpid(pid,&status,0);
    
        t2 = tscgetf();
    
        free(buf);
    }
    
    void
    shmone(int i,int dtflg)
    {
        int pid, status;
    
        pid = fork();
    
        if (pid == 0) {
            notify_wait();
    
    // NOTE/BUG: not valid to copy from the shared buffer -- the whole point is to
    // operate on the shared buffer directly!!!
            scan(shmPtr,size);
            memcpy(shmPtr,tscrcv,sizeof(tscrcv));
            exit(0);
        }
    
        t1 = tscgetf();
        char *str = shmPtr;
    
        for (size_t j = 0; j < size; j++)
            str[j] = '1';
        str[size] = '\0';
    
        notify_send();
    
        waitpid(pid,&status,0);
    
        t2 = tscgetf();
    }
    
    int
    main(int argc,char **argv)
    {
        char *cp;
    
        --argc;
        ++argv;
    
        setlinebuf(stdout);
        tsczero = tscgetf();
        printf("main (%d)\n",getpid());
    
        for (;  argc > 0;  --argc, ++argv) {
            cp = *argv;
            if (*cp != '-')
                break;
    
            cp += 2;
            switch (cp[-1]) {
            case 'C':  // number of trials
                opt_C = (*cp != 0) ? atoi(cp) : 1;
                break;
    
            case 'I':  // number of iterations
                opt_I = (*cp != 0) ? atoi(cp) : 1;
                break;
    
            case 'm':  // notifcation message type
                opt_m = (*cp != 0) ? atoi(cp) : 1;
                break;
    
            case 'S':  // pipe sender should issue F_SETPIPE_SZ
                opt_S = (*cp != 0) ? atoi(cp) : 1;
                break;
    
            case 'v':  // dump log files to stdout
                opt_v = (*cp != 0) ? atoi(cp) : 1;
                break;
    
            case 'x':  // child should scan data
                opt_x = (*cp != 0) ? atoi(cp) : 1;
                break;
    
            default:
                sysfault("main: unknown option -- '%s'\n",cp - 2);
                break;
            }
        }
    
        if (! opt_v)
            optshow(stdout);
    
        do {
            cp = *argv;
    
            if (argc < 1) {
                forkflg = 1;
                dobest("pipe",pipeone);
                dobest("shm",shmone);
                break;
            }
    
            if (strcmp(cp,"pipe") == 0) {
                dobest("pipe",pipeone);
                break;
            }
    
            if (strcmp(cp,"shm") == 0) {
                dobest("shm",shmone);
                break;
            }
    
            sysfault("main: unknown type -- '%s'\n",cp);
        } while (0);
    
        return 0;
    }