I am writing a lab on operating systems. You need to write two programs that transmit data using pipe and shared memory segments. It is necessary to compare the transfer time and come to the conclusion that the shared memory segments work faster with big data. But it turns out the opposite. What is the problem in my code? What am I doing wrong? I calculate the working time as the end of reading minus the beginning of writing
UPD: I listened to the comments and changed the code, add new result
CODE PIPE:
#define _GNU_SOURCE
#include <sys/time.h>
#include <string.h>
#include <time.h>
#include <unistd.h>
#include <stdio.h>
#include <wait.h>
#include <math.h>
#include <stdlib.h>
#include <fcntl.h>
#define COUNT 6
int main()
{
// fd[0]=READ, fd[1]=WRITE
int fd[2], nread, pid, status, base = 1;
for (int i = 0; i < COUNT; i++)
{
int size = 8*base; // bytes
base *= 16;
pipe(fd);
fcntl(fd[1], F_SETPIPE_SZ, size);
fcntl(fd[0], F_SETPIPE_SZ, size);
struct timespec t1, t2;
pid = fork();
if (pid == 0)
{
// READ
int tmp_size = 0;
close(fd[1]);
char *buf = (char *)malloc((size) * sizeof(char));
while((nread = read(fd[0], buf, size)) != 0);
free(buf);
exit(1);
}
else
{
// WRITE
close(fd[0]);
clock_gettime(CLOCK_REALTIME, &t1);
char *buf = (char *)malloc((size) * sizeof(char));
for (int j = 0; j < size; j++)
{
*(buf+j) = '1';
}
write(fd[1], buf, size);
close(fd[1]);
free(buf);
}
wait(&status);
clock_gettime(CLOCK_REALTIME, &t2);
FILE *file = i == 0? fopen("program1_result", "w"): fopen("program1_result", "a+");
char cur_time[150];
sprintf(cur_time, "%ld nsec\n", 1000000000*(t2.tv_sec - t1.tv_sec) + (t2.tv_nsec-t1.tv_nsec));
if (file != NULL) fputs(cur_time, file);
fclose(file);
}
}
CODE SHARED MEMORY:
#include <sys/ipc.h>
#include <sys/types.h>
#include <sys/shm.h>
#include <sys/time.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/sem.h>
#include <math.h>
#include <string.h>
#include <stdlib.h>
#include <time.h>
#include <wait.h>
#define COUNT 6
int main()
{
int pid, status, shmid, sem, base = 1;
char *shmPtr;
key_t shm_keys[COUNT], sem_keys[COUNT];
for (int i = 0; i < COUNT; i++)
{
shm_keys[i] = ftok(".", (char)i);
sem_keys[i] = ftok(".", 'a' + i);
}
for (int i = 0; i < COUNT; i++)
{
int size = 8*base;
base *= 16;
if ((shmid = shmget(shm_keys[i], size + 1, IPC_CREAT | 0666)) == -1) exit(1);
shmPtr = shmat(shmid, 0, 0);
sem = semget(sem_keys[i], 1, IPC_CREAT | 0666);
//struct timeval s, e;
struct timespec t1, t2;
pid = fork();
if (pid == 0)
{
while(semctl(sem, 0, GETVAL) != 1);
char *str = (char *)malloc(size * sizeof(char));
strcpy(str, shmPtr);
free(str);
exit(1);
}
else
{
clock_gettime(CLOCK_REALTIME, &t1);
char *str = (char *)malloc((size+1) * sizeof(char));
for (int j = 0; j < size; j++)
{
*(str+j) = '1';
}
*(str + size) = '\0';
strcpy(shmPtr, str);
semctl(sem, 0, SETVAL, 1);
free(str);
}
wait(&status);
clock_gettime(CLOCK_REALTIME, &t2);
FILE *file = i == 0? fopen("program2_result", "w"): fopen("program2_result", "a+");
char cur_time[150];
sprintf(cur_time, "%ld nsec\n", 1000000000*(t2.tv_sec - t1.tv_sec) + (t2.tv_nsec-t1.tv_nsec));
if (file != NULL) fputs(cur_time, file);
fclose(file);
shmdt(shmPtr);
shmctl(shmid, IPC_RMID, NULL);
semctl(sem, 0, IPC_RMID);
}
return 0;
}
RESULT PIPE:
RESULT SHARED MEMORY:
New result:
|bytes | pipe (nsec) | shm (nsec)|
|---------|-------------|-----------|
| 8 | 517516 | 489703 |
| 128 | 205485 | 440699 |
| 2048 | 374170 | 1162227 |
| 32768 | 584830 | 548490 |
| 524288 | 3162177 | 4010005 |
| 8388608 | 50293808 | 67142116 |
Please help understand what i do wrong.
What is the problem in my code? What am I doing wrong?
Shared memory, particularly for large amounts of data is almost always faster.
The two programs are not equivalent in what they measure and this lead to erroneous results.
When corrected, the results clearly show that shared memory is faster (2x-3x faster). And, in terms of latency of the receiver, with shared memory, the receiver can start processing data much faster.
Consider the logical steps until the receiver can process data:
write
which is a copying of the data. The receiver does a read
which is [another] copying of the data.In other words, the pipe version does two [unnecessary] copy operations that the shm version doesn't.
Also, consider the "latency". The time from when the sender has filled the buffer until the receiver is [capable of] processing the data. The pipe version still has to write
the data to the pipe. But, with the shm version, the receiver can start immediately after receiving the "notify" message (more on that in the update below).
Issues in the code ...
strcpy
is extraneous. It defeats the purpose of using shared memory.strcpy
is doing extra work that the pipe version isn't doing. That is, the pipe version is only doing read
-- no processing of the data.fork
, we can use IPC_PRIVATE
instead of ftok
(a bit cleaner).Here is a refactored version. It is annotated with bugs/fixes.
.c
file.fopen
calls could be cleaned up.#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
#include <wait.h>
#include <math.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#define sysfault(_fmt...) \
do { \
fprintf(stderr,_fmt); \
exit(1); \
} while (0)
#define Tmark (tscgetf() - tscbeg)
double tsczero;
double
_tscgetf(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_MONOTONIC,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
sec -= tsczero;
return sec;
}
#define tscgetf() \
({ \
__asm__ __volatile__("nop\n" ::: "memory"); \
_tscgetf(); \
})
int base;
size_t size;
double t1;
double t2;
int shmid, sem;
char *shmPtr;
double tscbeg;
double tscsend[3];
double tscbest[3];
double tscrcv[3];
struct hdr {
const char *sym;
const char *reason;
};
#define HDRMAX (sizeof(hdrs) / sizeof(hdrs[0]))
#define _HDRPUT(_fmt,_data...) \
do { \
totlen = fprintf(file,_fmt,_data); \
for (; totlen < lim; ++totlen) \
fputc(' ',file); \
} while (0)
#define HDRPUT(_fmt,_data...) \
do { \
_HDRPUT(_fmt,_data); \
++ihdr; \
} while (0)
void
result(int i,const char *pre,double tbest)
{
int totlen;
int lim = 12;
char name[100];
sprintf(name,"%s_result",pre);
if (i == 0)
unlink(name);
FILE *file = fopen(name,"a");
static struct hdr hdrs[] = {
{ "ELAPSED", "total elapsed time" },
{ "Tsend", "sender unlocks receiver" },
{ "Twait", "receiver becomes ready for input" },
{ "Trcv", "receiver released by sender" },
{ "Latency", "Trcv - Twait (time receiver is delayed)" },
{ "size", "buffer size" },
};
int ihdr;
if (i == 0) {
fprintf(file,"Legend:\n");
for (ihdr = 0; ihdr < HDRMAX; ++ihdr) {
_HDRPUT("%s",hdrs[ihdr].sym);
fprintf(file,"-- %s\n",hdrs[ihdr].reason);
}
fprintf(file,"\n");
for (ihdr = 0; ihdr < HDRMAX; ++ihdr)
_HDRPUT("%s",hdrs[ihdr].sym);
fprintf(file,"\n");
}
ihdr = 0;
HDRPUT("%.9f",tbest);
HDRPUT("%.9f",tscsend[0]);
HDRPUT("%.9f",tscrcv[0]);
HDRPUT("%.9f",tscrcv[1]);
HDRPUT("%.9f",tscrcv[1] - tscrcv[0]);
#if 0
HDRPUT("%d/%d",base,size);
#else
fprintf(file,"%zu",size);
#endif
fprintf(file,"\n");
fclose(file);
}
#define COUNT 16
void
dobest(const char *pre,void (*fnc)(int i,int dtflg))
{
// fd[0]=READ, fd[1]=WRITE
base = 1;
for (int i = 0; i < COUNT; i++) {
double tbest = 1e9;
size = base;
if ((shmid = shmget(IPC_PRIVATE, size + 1, IPC_CREAT | 0666)) == -1)
sysfault("dobest: shmget failure -- %s\n",strerror(errno));
shmPtr = shmat(shmid, 0, 0);
sem = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
// do multiple trials to minimize the effects of timeslicing and system
// loading
for (int iter = 3; iter > 0; --iter) {
tscbeg = tscgetf();
fnc(i,iter == 1);
double tdif = t2 - t1;
if (tdif < tbest) {
tbest = tdif;
memcpy(tscrcv,shmPtr,sizeof(tscrcv));
memcpy(tscbest,tscsend,sizeof(tscbest));
}
}
// record the best result
result(i,pre,tbest);
shmdt(shmPtr);
shmctl(shmid, IPC_RMID, NULL);
semctl(sem, 0, IPC_RMID);
base *= 4;
}
}
void
pipeone(int i,int dtflg)
{
int fd[2];
int nread, pid, status;
pipe(fd);
fcntl(fd[1], F_SETPIPE_SZ, size);
fcntl(fd[0], F_SETPIPE_SZ, size);
char *buf = malloc(size);
pid = fork();
if (pid == 0) {
// READ
close(fd[1]);
tscrcv[0] = Tmark;
int mark = 1;
while ((nread = read(fd[0], buf, size)) != 0) {
if (mark)
tscrcv[1] = Tmark;
mark = 0;
}
memcpy(shmPtr,tscrcv,sizeof(tscrcv));
exit(0);
}
// WRITE
close(fd[0]);
t1 = tscgetf();
for (size_t j = 0; j < size; j++)
buf[j] = '1';
tscsend[0] = Tmark;
write(fd[1], buf, size);
close(fd[1]);
wait(&status);
t2 = tscgetf();
free(buf);
}
key_t shm_keys[COUNT], sem_keys[COUNT];
void
shmone(int i,int dtflg)
{
int pid, status;
pid = fork();
if (pid == 0) {
tscrcv[0] = Tmark;
while (semctl(sem, 0, GETVAL) != 1);
tscrcv[1] = Tmark;
// NOTE/BUG: not valid to copy from the shared buffer -- the whole point is to
// operate on the shared buffer directly!!!
#if 0
char *str = (char *) malloc(size);
strcpy(str, shmPtr);
free(str);
exit(1);
// NOTE/FIX: do _not_ copy out the buffer -- for testing purposes, we only want
// to measure the delivery time
#else
size_t len = 0;
#if 0
len = strlen(shmPtr);
#endif
memcpy(shmPtr,tscrcv,sizeof(tscrcv));
exit(len % 0x7f);
#endif
}
t1 = tscgetf();
#if 0
char *str = (char *) malloc((size + 1) * sizeof(char));
#else
char *str = shmPtr;
#endif
for (size_t j = 0; j < size; j++)
str[j] = '1';
str[size] = '\0';
#if 0
strcpy(shmPtr, str);
#endif
tscsend[0] = Tmark;
semctl(sem, 0, SETVAL, 1);
#if 0
free(str);
#endif
wait(&status);
t2 = tscgetf();
}
void
pipeall(void)
{
// fd[0]=READ, fd[1]=WRITE
dobest("pipe",pipeone);
}
void
shmall(void)
{
// fd[0]=READ, fd[1]=WRITE
for (int i = 0; i < COUNT; i++) {
shm_keys[i] = ftok(".", (char) i);
sem_keys[i] = ftok(".", 'a' + i);
}
dobest("shm",shmone);
}
int
main(int argc,char **argv)
{
--argc;
++argv;
tsczero = tscgetf();
if (argc < 1)
sysfault("main: specify type: pipe or shm\n");
do {
char *cp = *argv;
if (strcmp(cp,"pipe") == 0) {
pipeall();
break;
}
if (strcmp(cp,"shm") == 0) {
shmall();
break;
}
sysfault("main: unknown type -- '%s'\n",cp);
} while (0);
return 0;
}
In the code above, I've used cpp
conditionals to denote old vs. new code:
#if 0
// old code
#else
// new code
#endif
#if 1
// new code
#endif
Note: this can be cleaned up by running the file through unifdef -k
Here is the output of ./program pipe
:
Legend:
ELAPSED -- total elapsed time
Tsend -- sender unlocks receiver
Twait -- receiver becomes ready for input
Trcv -- receiver released by sender
Latency -- Trcv - Twait (time receiver is delayed)
size -- buffer size
ELAPSED Tsend Twait Trcv Latency size
0.000212793 0.000097429 0.000125559 0.000162328 0.000036769 1
0.000167312 0.000104965 0.000125442 0.000147394 0.000021952 4
0.000177647 0.000096102 0.000093215 0.000128481 0.000035266 16
0.000106641 0.000080761 0.000077737 0.000096395 0.000018658 64
0.000204464 0.000163753 0.000112410 0.000148289 0.000035879 256
0.000147881 0.000060119 0.000144019 0.000166102 0.000022083 1024
0.000108481 0.000072081 0.000080240 0.000099838 0.000019598 4096
0.000156202 0.000130236 0.000145147 0.000173962 0.000028815 16384
0.000233014 0.000144926 0.000076950 0.000188236 0.000111286 65536
0.000605337 0.000425224 0.000079762 0.000537632 0.000457870 262144
0.002080624 0.001375300 0.000150789 0.001993695 0.001842906 1048576
0.009131487 0.005655973 0.000243503 0.008914248 0.008670745 4194304
0.036384772 0.021980127 0.000213079 0.035238290 0.035025211 16777216
0.142985735 0.072771127 0.000424021 0.137922121 0.137498100 67108864
0.572007428 0.292893437 0.000403933 0.553239097 0.552835164 268435456
2.276978512 1.176618786 0.000574158 2.220540762 2.219966604 1073741824
Here is the output of ./program shm
:
Legend:
ELAPSED -- total elapsed time
Tsend -- sender unlocks receiver
Twait -- receiver becomes ready for input
Trcv -- receiver released by sender
Latency -- Trcv - Twait (time receiver is delayed)
size -- buffer size
ELAPSED Tsend Twait Trcv Latency size
0.000181048 0.000081319 0.000126865 0.000152599 0.000025734 1
0.000277881 0.000079515 0.000165932 0.000177756 0.000011824 4
0.000191934 0.000081238 0.000126623 0.000134039 0.000007416 16
0.000201862 0.000080135 0.000124149 0.000135244 0.000011095 64
0.000183210 0.000080799 0.000107105 0.000113932 0.000006827 256
0.000146235 0.000085935 0.000112573 0.000119606 0.000007033 1024
0.000144176 0.000085737 0.000000000 0.000000000 0.000000000 4096
0.000235418 0.000100743 0.000136712 0.000148129 0.000011417 16384
0.000172821 0.000160697 0.000123424 0.000134592 0.000011168 65536
0.000201004 0.000241614 0.000122755 0.000129961 0.000007206 262144
0.000748101 0.000788737 0.000091531 0.000098554 0.000007023 1048576
0.002963725 0.003005338 0.000150246 0.000157523 0.000007277 4194304
0.011919764 0.011992419 0.000192575 0.000201370 0.000008795 16777216
0.047795073 0.047874544 0.000200046 0.000209144 0.000009098 67108864
0.190522428 0.190601263 0.000178723 0.000187669 0.000008946 268435456
0.757310083 0.757391226 0.000177408 0.000186831 0.000009423 1073741824
Here is the cleaned up source:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
#include <wait.h>
#include <math.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#define sysfault(_fmt...) \
do { \
fprintf(stderr,_fmt); \
exit(1); \
} while (0)
#define Tmark (tscgetf() - tscbeg)
double tsczero;
double
_tscgetf(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_MONOTONIC,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
sec -= tsczero;
return sec;
}
#define tscgetf() \
({ \
__asm__ __volatile__("nop\n" ::: "memory"); \
_tscgetf(); \
})
int base;
size_t size;
double t1;
double t2;
int shmid, sem;
char *shmPtr;
double tscbeg;
double tscsend[3];
double tscbest[3];
double tscrcv[3];
struct hdr {
const char *sym;
const char *reason;
};
#define HDRMAX (sizeof(hdrs) / sizeof(hdrs[0]))
#define _HDRPUT(_fmt,_data...) \
do { \
totlen = fprintf(file,_fmt,_data); \
for (; totlen < lim; ++totlen) \
fputc(' ',file); \
} while (0)
#define HDRPUT(_fmt,_data...) \
do { \
_HDRPUT(_fmt,_data); \
++ihdr; \
} while (0)
void
result(int i,const char *pre,double tbest)
{
int totlen;
int lim = 12;
char name[100];
sprintf(name,"%s_result",pre);
if (i == 0)
unlink(name);
FILE *file = fopen(name,"a");
static struct hdr hdrs[] = {
{ "ELAPSED", "total elapsed time" },
{ "Tsend", "sender unlocks receiver" },
{ "Twait", "receiver becomes ready for input" },
{ "Trcv", "receiver released by sender" },
{ "Latency", "Trcv - Twait (time receiver is delayed)" },
{ "size", "buffer size" },
};
int ihdr;
if (i == 0) {
fprintf(file,"Legend:\n");
for (ihdr = 0; ihdr < HDRMAX; ++ihdr) {
_HDRPUT("%s",hdrs[ihdr].sym);
fprintf(file,"-- %s\n",hdrs[ihdr].reason);
}
fprintf(file,"\n");
for (ihdr = 0; ihdr < HDRMAX; ++ihdr)
_HDRPUT("%s",hdrs[ihdr].sym);
fprintf(file,"\n");
}
ihdr = 0;
HDRPUT("%.9f",tbest);
HDRPUT("%.9f",tscsend[0]);
HDRPUT("%.9f",tscrcv[0]);
HDRPUT("%.9f",tscrcv[1]);
HDRPUT("%.9f",tscrcv[1] - tscrcv[0]);
fprintf(file,"%zu",size);
fprintf(file,"\n");
fclose(file);
}
#define COUNT 16
void
dobest(const char *pre,void (*fnc)(int i,int dtflg))
{
// fd[0]=READ, fd[1]=WRITE
base = 1;
for (int i = 0; i < COUNT; i++) {
double tbest = 1e9;
size = base;
if ((shmid = shmget(IPC_PRIVATE, size + 1, IPC_CREAT | 0666)) == -1)
sysfault("dobest: shmget failure -- %s\n",strerror(errno));
shmPtr = shmat(shmid, 0, 0);
sem = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
// do multiple trials to minimize the effects of timeslicing and system
// loading
for (int iter = 3; iter > 0; --iter) {
tscbeg = tscgetf();
fnc(i,iter == 1);
double tdif = t2 - t1;
if (tdif < tbest) {
tbest = tdif;
memcpy(tscrcv,shmPtr,sizeof(tscrcv));
memcpy(tscbest,tscsend,sizeof(tscbest));
}
}
// record the best result
result(i,pre,tbest);
shmdt(shmPtr);
shmctl(shmid, IPC_RMID, NULL);
semctl(sem, 0, IPC_RMID);
base *= 4;
}
}
void
pipeone(int i,int dtflg)
{
int fd[2];
int nread, pid, status;
pipe(fd);
fcntl(fd[1], F_SETPIPE_SZ, size);
fcntl(fd[0], F_SETPIPE_SZ, size);
char *buf = malloc(size);
pid = fork();
if (pid == 0) {
// READ
close(fd[1]);
tscrcv[0] = Tmark;
int mark = 1;
while ((nread = read(fd[0], buf, size)) != 0) {
if (mark)
tscrcv[1] = Tmark;
mark = 0;
}
memcpy(shmPtr,tscrcv,sizeof(tscrcv));
exit(0);
}
// WRITE
close(fd[0]);
t1 = tscgetf();
for (size_t j = 0; j < size; j++)
buf[j] = '1';
tscsend[0] = Tmark;
write(fd[1], buf, size);
close(fd[1]);
wait(&status);
t2 = tscgetf();
free(buf);
}
key_t shm_keys[COUNT], sem_keys[COUNT];
void
shmone(int i,int dtflg)
{
int pid, status;
pid = fork();
if (pid == 0) {
tscrcv[0] = Tmark;
while (semctl(sem, 0, GETVAL) != 1);
tscrcv[1] = Tmark;
// NOTE/BUG: not valid to copy from the shared buffer -- the whole point is to
// operate on the shared buffer directly!!!
size_t len = 0;
memcpy(shmPtr,tscrcv,sizeof(tscrcv));
exit(len % 0x7f);
}
t1 = tscgetf();
char *str = shmPtr;
for (size_t j = 0; j < size; j++)
str[j] = '1';
str[size] = '\0';
tscsend[0] = Tmark;
semctl(sem, 0, SETVAL, 1);
wait(&status);
t2 = tscgetf();
}
void
pipeall(void)
{
// fd[0]=READ, fd[1]=WRITE
dobest("pipe",pipeone);
}
void
shmall(void)
{
// fd[0]=READ, fd[1]=WRITE
for (int i = 0; i < COUNT; i++) {
shm_keys[i] = ftok(".", (char) i);
sem_keys[i] = ftok(".", 'a' + i);
}
dobest("shm",shmone);
}
int
main(int argc,char **argv)
{
--argc;
++argv;
tsczero = tscgetf();
if (argc < 1)
sysfault("main: specify type: pipe or shm\n");
do {
char *cp = *argv;
if (strcmp(cp,"pipe") == 0) {
pipeall();
break;
}
if (strcmp(cp,"shm") == 0) {
shmall();
break;
}
sysfault("main: unknown type -- '%s'\n",cp);
} while (0);
return 0;
}
UPDATE:
I've done some considerable cleanup on the test program. Due to some objections, I've added some options:
-C<n> -- test count
-I<n> -- iteration count
-m<0/1> -- notification (1=msgsnd/msgrcv, 0=sem*)
-S<0/1> -- 1=pipe set maximum size (F_SETPIPE_SZ)
-x<0/1> -- 1=child scan/process data
-v<0/1> -- 1=copy results to stdout
It turns out that using sem*
for notification [as posted here] is not as reliable as msgsnd/msgrcv
[which is what I traditionally use]. Using sem*
can produce a negative latency value for some tests. This indicates that the mechanism isn't robust [for some reason]. So, I replaced it with msgsnd/msgrcv
I added the -x
option for force the receiver to "do work" on the buffer. This had a slight effect. As I mentioned, we want to measure the transfer time (and not the transfer time + processing time), so I'd leave off -x
, but it's there for completeness.
Also, if no "test" argument is given, the program will run the "pipe" test and then the "shm" test sequentially (in subprocesses) as a convenience.
More details about the options are in the code:
#define _GNU_SOURCE
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <string.h>
#include <errno.h>
#include <time.h>
#include <sys/time.h>
#include <wait.h>
#include <math.h>
#include <fcntl.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/shm.h>
#include <sys/sem.h>
#include <sys/msg.h>
int opt_C = 16;
int opt_I = 3;
int opt_m = 1;
int opt_S = 1;
int opt_x;
int opt_v;
#define sysfault(_fmt...) \
do { \
fprintf(stderr,_fmt); \
exit(1); \
} while (0)
#define Tmark (tscgetf() - tscbeg)
double tsczero;
double
_tscgetf(void)
{
struct timespec ts;
double sec;
clock_gettime(CLOCK_MONOTONIC,&ts);
sec = ts.tv_nsec;
sec /= 1e9;
sec += ts.tv_sec;
sec -= tsczero;
return sec;
}
#define barrier \
__asm__ __volatile__("nop\n" ::: "memory"); \
#define tscgetf() \
({ \
barrier; \
_tscgetf(); \
})
int base;
size_t size;
double t1;
double t2;
int shmid;
int sem;
char *shmPtr;
char *fake;
int forkflg;
double tscbeg;
double tscsend[3];
double tscbest[3];
double tscrcv[3];
struct msg {
long msg_type;
int msg_payload;
};
#define MSG_TYPE 17
enum {
Tnotify
};
enum {
Tready,
Trunning,
};
char result_file[100];
struct hdr {
const char *sym;
const char *reason;
};
#define HDRMAX (sizeof(hdrs) / sizeof(hdrs[0]))
#define _HDRPUT(_fmt,_data...) \
do { \
totlen = fprintf(xf,_fmt,_data); \
for (; totlen < lim; ++totlen) \
fputc(' ',xf); \
} while (0)
#define HDRPUT(_fmt,_data...) \
do { \
_HDRPUT(_fmt,_data); \
++ihdr; \
} while (0)
void
optshow(FILE *xf)
{
fprintf(xf,"options:\n");
fprintf(xf," C=%d -- test count\n",opt_C);
fprintf(xf," I=%d -- iteration count\n",opt_I);
fprintf(xf," m=%d -- notification (1=msgsnd/msgrcv, 0=sem*)\n",opt_m);
fprintf(xf," S=%d -- 1=pipe set maximum size (F_SETPIPE_SZ)\n",opt_S);
fprintf(xf," x=%d -- 1=child scan/process data\n",opt_x);
}
void
result(int i,const char *pre,double tbest)
{
int totlen;
int lim = opt_m ? 12 : 13;
if (i == 0)
unlink(result_file);
FILE *xf = fopen(result_file,"a");
static struct hdr hdrs[] = {
{ "ELAPSED", "total elapsed time" },
{ "Tnotify", "sender buffer filled -- receiver notified" },
{ "Tready", "receiver becomes ready for input" },
{ "Trunning", "receiver released by sender" },
{ "Latency", "Trunning - Tnotify (time receiver is delayed)" },
{ "size", "buffer size" },
};
int ihdr;
if (i == 0) {
fprintf(xf,"TEST: %s ...\n",pre);
optshow(xf);
fprintf(xf,"\n");
fprintf(xf,"Legend:\n");
for (ihdr = 0; ihdr < HDRMAX; ++ihdr) {
_HDRPUT("%s",hdrs[ihdr].sym);
fprintf(xf,"-- %s\n",hdrs[ihdr].reason);
}
fprintf(xf,"\n");
for (ihdr = 0; ihdr < HDRMAX; ++ihdr)
_HDRPUT("%s",hdrs[ihdr].sym);
fprintf(xf,"\n");
}
ihdr = 0;
HDRPUT("%.9f",tbest);
HDRPUT("%.9f",tscbest[Tnotify]);
HDRPUT("%.9f",tscrcv[Tready]);
HDRPUT("%.9f",tscrcv[Trunning]);
HDRPUT("%.9f",tscrcv[Trunning] - tscbest[Tnotify]);
fprintf(xf,"%zu",size);
fprintf(xf,"\n");
fclose(xf);
}
void
result_show(void)
{
char *av[3];
pid_t pid;
av[0] = "cat";
av[1] = result_file;
av[2] = NULL;
if (opt_v) {
if (result_file[0] == 0)
sysfault("result_show: null result_file\n");
pid = fork();
if (pid == 0) {
printf("\n");
execvp(av[0],av);
sysfault("result_show: exec fault -- %s\n",strerror(errno));
}
waitpid(pid,NULL,0);
}
}
void
scan(char *buf,size_t len)
{
if (opt_x)
fake = memchr(buf,'2',len);
}
void
notify_open(void)
{
if (opt_m)
sem = msgget(IPC_PRIVATE,IPC_CREAT | 0666);
else
sem = semget(IPC_PRIVATE, 1, IPC_CREAT | 0666);
}
void
notify_close(void)
{
if (opt_m)
msgctl(sem, 0, IPC_RMID);
else
semctl(sem, 0, IPC_RMID);
}
void
notify_send(void)
{
struct msg msg;
tscsend[Tnotify] = Tmark;
if (opt_m) {
msg.msg_type = MSG_TYPE;
msgsnd(sem,&msg,sizeof(msg) - sizeof(msg.msg_type),0);
}
else
semctl(sem, 0, SETVAL, 1);
}
void
notify_wait(void)
{
struct msg msg;
tscrcv[Tready] = Tmark;
if (opt_m) {
msgrcv(sem,&msg,sizeof(msg) - sizeof(msg.msg_type),MSG_TYPE,0);
}
else {
while (semctl(sem, 0, GETVAL) != 1);
}
tscrcv[Trunning] = Tmark;
}
void
dobest(const char *pre,void (*fnc)(int i,int dtflg))
{
// fd[0]=READ, fd[1]=WRITE
if (! opt_v) {
printf("\n");
printf("dobest: %s ...\n",pre);
}
fflush(stdout);
sprintf(result_file,"%s_result",pre);
if (forkflg) {
pid_t pid = fork();
if (pid != 0) {
waitpid(pid,NULL,0);
result_show();
return;
}
}
base = 1;
for (int i = 0; i < opt_C; i++) {
double tbest = 1e9;
size = base;
if ((shmid = shmget(IPC_PRIVATE, size + 1, IPC_CREAT | 0666)) == -1)
sysfault("dobest: shmget failure -- %s\n",strerror(errno));
shmPtr = shmat(shmid, 0, 0);
notify_open();
// do multiple trials to minimize the effects of timeslicing and system
// loading
for (int iter = opt_I; iter > 0; --iter) {
tscbeg = tscgetf();
barrier;
fnc(i,iter == 1);
double tdif = t2 - t1;
if (tdif < tbest) {
tbest = tdif;
memcpy(tscrcv,shmPtr,sizeof(tscrcv));
memcpy(tscbest,tscsend,sizeof(tscbest));
}
}
// record the best result
result(i,pre,tbest);
shmdt(shmPtr);
shmctl(shmid, IPC_RMID, NULL);
notify_close();
base *= 4;
}
if (forkflg)
exit(0);
result_show();
}
void
pipeone(int i,int dtflg)
{
int fd[2];
ssize_t nread;
int pid, status;
pipe(fd);
if (opt_S) {
fcntl(fd[1], F_SETPIPE_SZ, size);
fcntl(fd[0], F_SETPIPE_SZ, size);
}
char *buf = malloc(size);
pid = fork();
if (pid == 0) {
// READ
close(fd[1]);
tscrcv[0] = Tmark;
int mark = 1;
while (1) {
nread = read(fd[0], buf, size);
int sverr = errno;
if (nread == 0)
break;
if (mark)
tscrcv[1] = Tmark;
mark = 0;
if (nread < 0) {
printf("pipeone: read error -- %s\n",strerror(sverr));
break;
}
scan(buf,nread);
}
memcpy(shmPtr,tscrcv,sizeof(tscrcv));
exit(0);
}
// WRITE
close(fd[0]);
t1 = tscgetf();
for (size_t j = 0; j < size; j++)
buf[j] = '1';
tscsend[0] = Tmark;
size_t off = 0;
ssize_t xlen;
for (; off < size; off += xlen) {
xlen = write(fd[1], &buf[off], size - off);
if (xlen < 0) {
printf("pipeone: write error off=%zu size=%zu -- %s\n",
off,size,strerror(errno));
break;
}
}
close(fd[1]);
waitpid(pid,&status,0);
t2 = tscgetf();
free(buf);
}
void
shmone(int i,int dtflg)
{
int pid, status;
pid = fork();
if (pid == 0) {
notify_wait();
// NOTE/BUG: not valid to copy from the shared buffer -- the whole point is to
// operate on the shared buffer directly!!!
scan(shmPtr,size);
memcpy(shmPtr,tscrcv,sizeof(tscrcv));
exit(0);
}
t1 = tscgetf();
char *str = shmPtr;
for (size_t j = 0; j < size; j++)
str[j] = '1';
str[size] = '\0';
notify_send();
waitpid(pid,&status,0);
t2 = tscgetf();
}
int
main(int argc,char **argv)
{
char *cp;
--argc;
++argv;
setlinebuf(stdout);
tsczero = tscgetf();
printf("main (%d)\n",getpid());
for (; argc > 0; --argc, ++argv) {
cp = *argv;
if (*cp != '-')
break;
cp += 2;
switch (cp[-1]) {
case 'C': // number of trials
opt_C = (*cp != 0) ? atoi(cp) : 1;
break;
case 'I': // number of iterations
opt_I = (*cp != 0) ? atoi(cp) : 1;
break;
case 'm': // notifcation message type
opt_m = (*cp != 0) ? atoi(cp) : 1;
break;
case 'S': // pipe sender should issue F_SETPIPE_SZ
opt_S = (*cp != 0) ? atoi(cp) : 1;
break;
case 'v': // dump log files to stdout
opt_v = (*cp != 0) ? atoi(cp) : 1;
break;
case 'x': // child should scan data
opt_x = (*cp != 0) ? atoi(cp) : 1;
break;
default:
sysfault("main: unknown option -- '%s'\n",cp - 2);
break;
}
}
if (! opt_v)
optshow(stdout);
do {
cp = *argv;
if (argc < 1) {
forkflg = 1;
dobest("pipe",pipeone);
dobest("shm",shmone);
break;
}
if (strcmp(cp,"pipe") == 0) {
dobest("pipe",pipeone);
break;
}
if (strcmp(cp,"shm") == 0) {
dobest("shm",shmone);
break;
}
sysfault("main: unknown type -- '%s'\n",cp);
} while (0);
return 0;
}