Search code examples
ccircular-buffer

circular buffer with read only access from reader


I have a problem with a circular buffer I want to build where the reader only has read-only access. In order to achieve a smooth rollover, I have the writer to set an id in iterator+1 of the rollover data structure to 0 for which I check in with the reader. My algorith seems to work fine until the first roll over, then for some reason, the resder will read 0 from the id which the writer obviously has set. i have some compileable example code to demonstrate the problem right here:

#include <stdio.h>
#include <time.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

#define NUM_ALM 5
#define ERROR   -1
#define OK      0

//even IDs = alarm active
//odd IDs  = alarm clear
enum alarmid {
    BFD_ACT     = 0x02,
    BFD_CLR     = 0x03,
    LOS_ACT     = 0x0C
};
typedef struct alarm_s {
    long timestamp;
    int alarmid;
    int arg1;
    int arg2;
}alarm_t;
int alarm_add(int id, int arg1, int arg2);
int next_alarm_read(alarm_t *res);
void *alarm_reader(void *arg);

static alarm_t *roller;
pthread_cond_t cv;
pthread_mutex_t mutex;
int main (void)
{
    int i =0;
    alarm_t dat;
    pthread_t reader;
    int ret;

    roller = calloc(NUM_ALM,sizeof(alarm_t));
    printf("allocated memory: %lukB\n",(sizeof(alarm_t)*NUM_ALM)/1024);

    for (i = 1; i< NUM_ALM; i++){
        alarm_add(LOS_ACT,i,0);
    }
    ret = pthread_create(&reader,NULL,alarm_reader,NULL);
    if (ret){
        printf("Error - pthread_create() return code: %d\n",ret);
        return ERROR;
    }
    sleep(1);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_ACT,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_CLR,8,0);
    alarm_add(BFD_ACT,8,0);

    pthread_join(reader,NULL);
}

void *alarm_reader(void *arg)
{
    static alarm_t dat={0};
    int err = 0;
    while(err <= 2)
    {
        if (next_alarm_read(&dat)== OK)
            printf("read alarm id %d, arg1 %d,arg2 %d\n",dat.alarmid,dat.arg1,dat.arg2);
        else{
            printf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
            pthread_mutex_lock(&mutex);
            pthread_cond_wait(&cv, &mutex);
            pthread_mutex_unlock(&mutex);

            err++;
        }
    }
    printf("alarm_reader exit!\n");
}
int alarm_add(int id, int arg1, int arg2)
{
    static int i = 0;
    alarm_t dat={0};
    if (i<NUM_ALM){
        dat.timestamp = time(NULL);
        dat.alarmid = id;
        dat.arg1 = arg1;
        dat.arg2 = arg2;

        if (&roller[i]){
            memcpy(&roller[i],&dat,sizeof(alarm_t));
            if (i+1<NUM_ALM)
                roller[i+1].alarmid = 0;
            else
                roller[0].alarmid = 0;
            pthread_cond_signal(&cv);
            printf("added id %d, arg1 %d, arg2 %d @%d\n",roller[i].alarmid,roller[i].arg1,roller[i].arg2,i);
            i++;
        }
    } else {
        i = 0;
    }
    return 0;
}

int next_alarm_read(alarm_t *res)
{
    static int i = 0;
    static long prev_time = 0;
    if (!res)
        return ERROR;

    if (i<NUM_ALM)
    {
        if (roller[i].alarmid!=0){
            printf("next_alarm_read() reading @%d\n",i);
            res->timestamp = roller[i].timestamp;
            res->alarmid = roller[i].alarmid;
            res->arg1 = roller[i].arg1;
            res->arg2 = roller[i].arg2;
            prev_time = roller[i].timestamp;
            i++;
        } else {
            printf("next_alarm_read() @%d is %d,return ERROR\n",i,roller[i].alarmid);

            return ERROR;
        }
    } else {
        i = 0;
    }
    return OK;
}

Where the outpout looks like:

added id 12, arg1 1, arg2 0 @0
added id 12, arg1 2, arg2 0 @1
added id 12, arg1 3, arg2 0 @2
added id 12, arg1 4, arg2 0 @3
next_alarm_read() reading @0
read alarm id 12, arg1 1,arg2 0
next_alarm_read() reading @1
read alarm id 12, arg1 2,arg2 0
next_alarm_read() reading @2
read alarm id 12, arg1 3,arg2 0
next_alarm_read() reading @3
read alarm id 12, arg1 4,arg2 0
next_alarm_read() @4 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait
added id 2, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
added id 2, arg1 8, arg2 0 @1
added id 3, arg1 8, arg2 0 @2
added id 3, arg1 8, arg2 0 @3
added id 3, arg1 8, arg2 0 @4
added id 2, arg1 8, arg2 0 @0
next_alarm_read() reading @4
read alarm id 3, arg1 8,arg2 0
read alarm id 3, arg1 8,arg2 0
next_alarm_read() reading @0
read alarm id 2, arg1 8,arg2 0
next_alarm_read() @1 is 0,return ERROR
alarm_reader() next_alarm_read() returned ERROR, wait

the bottom print for next_alarm_read() @1 is 0,return ERROR is wrong, the id should be 2. Why does this not work as intended I'nm wondering?


Solution

  • A few issues:

    I'm not sure what if (&roller[i]) is supposed to do/mean.

    The sleep in main isn't really needed and I suspect it's an attempt to ameliorate the other issues below.

    alarm_add will drop an entry at the rollover point.

    Also, it may overrun the reader and overwrite entries before the reader can see the entries (i.e. a race condition).

    The reader and writer both need to see each others current queue indexes (i.e. they shouldn't be function scoped static) to prevent overrun/race

    There should be two condition variables and not just one:

    1. The writer detects the queue is full and needs to block until the reader has drained an entry
    2. The reader detects an empty queue and needs to block until the writer adds a new entry

    Here's a refactored version of your code that should address these issues. I've added some debug code. It may not be perfect [and may err on the side of conservatism], but it should get you a bit further [please pardon the gratuitous style cleanup]:

    #include <stdio.h>
    #include <time.h>
    #include <stdlib.h>
    #include <string.h>
    #include <pthread.h>
    #include <unistd.h>
    
    #define NUM_ALM 5
    #define ERROR   -1
    #define OK      0
    
    double tvzero;
    
    //even IDs = alarm active
    //odd IDs  = alarm clear
    enum alarmid {
        BFD_ACT = 0x02,
        BFD_CLR = 0x03,
        LOS_ACT = 0x0C
    };
    
    typedef struct alarm_s {
        long timestamp;
        int alarmid;
        int arg1;
        int arg2;
    } alarm_t;
    
    void alarm_add(int id, int arg1, int arg2);
    int next_alarm_read(alarm_t * res);
    void *alarm_reader(void *arg);
    
    static alarm_t *roller;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    
    // reader variables
    pthread_cond_t cv_notempty;             // writer signals when queue not empty
    volatile int need_notempty;             // reader sets this before waiting
    volatile int idxdeq;                    // reader's queue index
    
    // writer variables
    pthread_cond_t cv_notfull;              // reader signals when queue not full
    volatile int need_notfull;              // writer sets this before waiting
    volatile int idxenq;                    // writer's queue index
    
    volatile int stopall;
    
    double
    tvgetf(void)
    {
        struct timespec ts;
        double sec;
    
        clock_gettime(CLOCK_REALTIME,&ts);
    
        sec = ts.tv_nsec;
        sec /= 1e9;
        sec += ts.tv_sec;
    
        sec -= tvzero;
    
        return sec;
    }
    
    #define DBG(_reason) \
        dbg(_reason)
    
    void
    dbg(const char *reason)
    {
        double tvnow;
    
        tvnow = tvgetf();
        printf("[%.9f] %s\n",tvnow,reason);
    }
    
    int
    main(void)
    {
        int i = 0;
        pthread_t reader;
        int ret;
    
        tvzero = tvgetf();
    
        roller = calloc(NUM_ALM, sizeof(alarm_t));
        printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);
    
        // NOTE: queuing more than a full queue here will cause writer to block
        // forever because reader is not yet started
        for (i = 1; i < NUM_ALM; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    
        ret = pthread_create(&reader, NULL, alarm_reader, NULL);
        if (ret) {
            printf("Error - pthread_create() return code: %d\n", ret);
            return ERROR;
        }
    
    #if 0
        sleep(1);
    #endif
    
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_CLR, 8, 0);
        alarm_add(BFD_ACT, 8, 0);
    
        // tell reader that all items are queued and it should stop when it
        // processes the final item
        pthread_mutex_lock(&mutex);
        stopall = 1;
        if (need_notempty)
            pthread_cond_signal(&cv_notempty);
        pthread_mutex_unlock(&mutex);
    
        pthread_join(reader, NULL);
    
        return 0;
    }
    
    // RETURNS: queue index to process (-1=empty)
    int
    queue_notempty(void)
    {
        int curidx;
    
        do {
            curidx = idxdeq;
    
            // queue is empty
            if (curidx == idxenq) {
                curidx = -1;
                break;
            }
    
            // advance dequeue index
            idxdeq += 1;
            idxdeq %= NUM_ALM;
        } while (0);
    
        return curidx;
    }
    
    // RETURNS: queue index to use (-1=full)
    int
    queue_notfull(void)
    {
        int nxtidx;
        int curidx;
    
        do {
            // get current index
            curidx = idxenq;
    
            // advance to next slot (wrapping if necessary)
            nxtidx = curidx;
            nxtidx += 1;
            nxtidx %= NUM_ALM;
    
            // queue is full
            if (nxtidx == idxdeq) {
                curidx = -1;
                break;
            }
    
            // store back adjusted index
            idxenq = nxtidx;
        } while (0);
    
        return curidx;
    }
    
    void *
    alarm_reader(void *arg)
    {
        alarm_t dat = { 0 };
    
        while (1) {
            if (next_alarm_read(&dat))
                break;
            printf("read alarm id %d, arg1 %d,arg2 %d\n",
                dat.alarmid, dat.arg1, dat.arg2);
        }
    
        printf("alarm_reader exit!\n");
    
        return (void *) 0;
    }
    
    void
    alarm_add(int id, int arg1, int arg2)
    {
        int curidx;
        alarm_t *rol;
    
        pthread_mutex_lock(&mutex);
    
        while (1) {
            curidx = queue_notfull();
    
            // have an open slot -- store item into it
            if (curidx >= 0) {
                rol = &roller[curidx];
    
                rol->timestamp = time(NULL);
                rol->alarmid = id;
                rol->arg1 = arg1;
                rol->arg2 = arg2;
    
                printf("added id %d, arg1 %d, arg2 %d @%d\n",
                    rol->alarmid, rol->arg1, rol->arg2, curidx);
    
                // unblock reader if necessary
                if (need_notempty) {
                    DBG("writer signal notempty");
                    need_notempty = 0;
                    pthread_cond_signal(&cv_notempty);
                }
    
                break;
            }
    
            // queue is full -- wait for reader to free up some space
            DBG("writer need_notfull");
            need_notfull = 1;
            pthread_cond_wait(&cv_notfull,&mutex);
            DBG("writer wakeup");
        }
    
        pthread_mutex_unlock(&mutex);
    }
    
    // RETURNS: 1=stop, 0=normal
    int
    next_alarm_read(alarm_t *res)
    {
        //static long prev_time = 0;
        int curidx;
        alarm_t *rol;
        int stopflg = 0;
    
        pthread_mutex_lock(&mutex);
    
        while (1) {
            curidx = queue_notempty();
    
            // queue has an entry -- process it
            if (curidx >= 0) {
                rol = &roller[curidx];
    
                printf("next_alarm_read() reading @%d\n", curidx);
                *res = *rol;
                //prev_time = rol->timestamp;
    
                // if writer is waiting/blocking, wake it up because we just
                // freed up a queue slot
                if (need_notfull) {
                    DBG("reader signal notfull");
                    need_notfull = 0;
                    pthread_cond_signal(&cv_notfull);
                }
    
                break;
            }
    
            // stop when master has enqueued everything
            stopflg = stopall;
            if (stopflg)
                break;
    
            // queue is empty -- we must wait for writer to add something
            DBG("reader need_notempty");
            need_notempty = 1;
            pthread_cond_wait(&cv_notempty,&mutex);
        }
    
        pthread_mutex_unlock(&mutex);
    
        return stopflg;
    }
    

    UPDATE:

    I don't understand the do while(0); "loops" in the two Q functions, can you elaboratea little, please?

    The do while(0) is a technique that I use a lot to replace if/else ladder logic. I didn't invent it [it's discussed in some style guides, notably, "Code Complete"], but a lot of people that I've shown it to seem to like it. See my answer: About the exclusiveness of the cases of an if block for a better explanation.

    And I guessx what my initrial post didn't include is: the master should be able to enqueue things on an ongoing basis, there's no stopall and the reader should start reading as soon as something is available.

    Actually, I did realize that and the code I posted allows for that.

    You may want to issue the pthread_create before enqueuing any messages to prevent the potential deadlock I mentioned in the code comments.

    A fix for this would be to remove stopall, the pthread_cond-signal() (from main) is already done inside alarm_add() so this should work fine.

    The stopall is not to synchronize against overflow/underflow. It is merely if the writer (main thread) wants the receiver/thread to finish up and stop cleanly. It's more like a way to send an "EOF" condition to the reader.

    If your application is to run "forever", you can remove the stopall.

    Or, a cleaner way to signal "EOF": the main thread could enqueue a special "stop" message (e.g. a message with a timestamp of -1) to tell the receiver that no more messages will be sent ever and we wish to terminate the program.


    What I suggest is that you add a "diagnostic mode" to validate your program:

    Have the main do the pthread_create and then do:

        for (i = 1; i < 10000000; i++) {
            alarm_add(LOS_ACT, i, 0);
        }
    

    The reader should examine the arg1 values that come in. They should increment as above. If they don't, there is a logic error or race condition.


    Here is an updated version of my code with a -D option for a diagnostic/unit test mode. Note that all printing is disabled to allow it to run at extreme speed:

    #include <stdio.h>
    #include <time.h>
    #include <stdlib.h>
    #include <string.h>
    #include <pthread.h>
    #include <unistd.h>
    
    #define NUM_ALM 5
    #define ERROR   -1
    #define OK      0
    
    int opt_diag;
    double tvzero;
    
    //even IDs = alarm active
    //odd IDs  = alarm clear
    enum alarmid {
        BFD_ACT = 0x02,
        BFD_CLR = 0x03,
        LOS_ACT = 0x0C
    };
    
    typedef struct alarm_s {
        long timestamp;
        int alarmid;
        int arg1;
        int arg2;
    } alarm_t;
    
    void alarm_add(int id, int arg1, int arg2);
    int next_alarm_read(alarm_t * res);
    void *alarm_reader(void *arg);
    
    static alarm_t *roller;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    
    // reader variables
    pthread_cond_t cv_notempty;             // writer signals when queue not empty
    volatile int need_notempty;             // reader sets this before waiting
    volatile int idxdeq;                    // reader's queue index
    
    // writer variables
    pthread_cond_t cv_notfull;              // reader signals when queue not full
    volatile int need_notfull;              // writer sets this before waiting
    volatile int idxenq;                    // writer's queue index
    
    volatile int stopall;
    
    double
    tvgetf(void)
    {
        struct timespec ts;
        double sec;
    
        clock_gettime(CLOCK_REALTIME,&ts);
    
        sec = ts.tv_nsec;
        sec /= 1e9;
        sec += ts.tv_sec;
    
        sec -= tvzero;
    
        return sec;
    }
    
    #define prtf(_fmt...) \
        do { \
            if (opt_diag) \
                break; \
            printf(_fmt); \
        } while (0)
    
    #define DBG(_reason) \
        dbg(_reason)
    
    void
    dbg(const char *reason)
    {
        double tvnow;
    
        if (! opt_diag) {
            tvnow = tvgetf();
            printf("[%.9f] %s\n",tvnow,reason);
        }
    }
    
    int
    main(int argc,char **argv)
    {
        int i = 0;
        char *cp;
        pthread_t reader;
        int ret;
    
        --argc;
        ++argv;
    
        for (;  argc > 0;  --argc, ++argv) {
            cp = *argv;
            if (*cp != '-')
                break;
    
            switch (cp[1]) {
            case 'D':
                cp += 2;
                opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
                break;
            }
        }
    
        tvzero = tvgetf();
    
        roller = calloc(NUM_ALM, sizeof(alarm_t));
        printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);
    
        // NOTE: queuing more than a full queue here will cause writer to block
        // forever because reader is not yet started
        if (! opt_diag) {
            for (i = 1; i < NUM_ALM; i++) {
                alarm_add(LOS_ACT, i, 0);
            }
        }
    
        ret = pthread_create(&reader, NULL, alarm_reader, NULL);
        if (ret) {
            printf("Error - pthread_create() return code: %d\n", ret);
            return ERROR;
        }
    
    #if 0
        sleep(1);
    #endif
    
        if (opt_diag) {
            for (i = 1; i < opt_diag; i++) {
                alarm_add(LOS_ACT, i, 0);
            }
        }
        else {
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
        }
    
        // tell reader that all items are queued and it should stop when it
        // processes the final item
        pthread_mutex_lock(&mutex);
        stopall = 1;
        if (need_notempty)
            pthread_cond_signal(&cv_notempty);
        pthread_mutex_unlock(&mutex);
    
        pthread_join(reader, NULL);
    
        return 0;
    }
    
    // RETURNS: queue index to process (-1=empty)
    int
    queue_notempty(void)
    {
        int curidx;
    
        do {
            curidx = idxdeq;
    
            // queue is empty
            if (curidx == idxenq) {
                curidx = -1;
                break;
            }
    
            // advance dequeue index
            idxdeq += 1;
            idxdeq %= NUM_ALM;
        } while (0);
    
        return curidx;
    }
    
    // RETURNS: queue index to use (-1=full)
    int
    queue_notfull(void)
    {
        int nxtidx;
        int curidx;
    
        do {
            // get current index
            curidx = idxenq;
    
            // advance to next slot (wrapping if necessary)
            nxtidx = curidx;
            nxtidx += 1;
            nxtidx %= NUM_ALM;
    
            // queue is full
            if (nxtidx == idxdeq) {
                curidx = -1;
                break;
            }
    
            // store back adjusted index
            idxenq = nxtidx;
        } while (0);
    
        return curidx;
    }
    
    void *
    alarm_reader(void *arg)
    {
        alarm_t dat = { 0 };
        static int expval = 1;
    
        while (1) {
            if (next_alarm_read(&dat))
                break;
    
            if (opt_diag) {
                if (dat.arg1 != expval) {
                    printf("expected: %d got %d\n",expval,dat.arg1);
                    exit(1);
                }
                ++expval;
            }
    
            prtf("read alarm id %d, arg1 %d,arg2 %d\n",
                dat.alarmid, dat.arg1, dat.arg2);
        }
    
        printf("alarm_reader exit!\n");
    
        return (void *) 0;
    }
    
    void
    alarm_add(int id, int arg1, int arg2)
    {
        int curidx;
        alarm_t *rol;
    
        pthread_mutex_lock(&mutex);
    
        while (1) {
            curidx = queue_notfull();
    
            // have an open slot -- store item into it
            if (curidx >= 0) {
                rol = &roller[curidx];
    
                rol->timestamp = time(NULL);
                rol->alarmid = id;
                rol->arg1 = arg1;
                rol->arg2 = arg2;
    
                prtf("added id %d, arg1 %d, arg2 %d @%d\n",
                    rol->alarmid, rol->arg1, rol->arg2, curidx);
    
                // unblock reader if necessary
                if (need_notempty) {
                    DBG("writer signal notempty");
                    need_notempty = 0;
                    pthread_cond_signal(&cv_notempty);
                }
    
                break;
            }
    
            // queue is full -- wait for reader to free up some space
            DBG("writer need_notfull");
            need_notfull = 1;
            pthread_cond_wait(&cv_notfull,&mutex);
            DBG("writer wakeup");
        }
    
        pthread_mutex_unlock(&mutex);
    }
    
    // RETURNS: 1=stop, 0=normal
    int
    next_alarm_read(alarm_t *res)
    {
        //static long prev_time = 0;
        int curidx;
        alarm_t *rol;
        int stopflg = 0;
    
        pthread_mutex_lock(&mutex);
    
        while (1) {
            curidx = queue_notempty();
    
            // queue has an entry -- process it
            if (curidx >= 0) {
                rol = &roller[curidx];
    
                prtf("next_alarm_read() reading @%d\n", curidx);
                *res = *rol;
                //prev_time = rol->timestamp;
    
                // if writer is waiting/blocking, wake it up because we just
                // freed up a queue slot
                if (need_notfull) {
                    DBG("reader signal notfull");
                    need_notfull = 0;
                    pthread_cond_signal(&cv_notfull);
                }
    
                break;
            }
    
            // stop when master has enqueued everything
            stopflg = stopall;
            if (stopflg)
                break;
    
            // queue is empty -- we must wait for writer to add something
            DBG("reader need_notempty");
            need_notempty = 1;
            pthread_cond_wait(&cv_notempty,&mutex);
        }
    
        pthread_mutex_unlock(&mutex);
    
        return stopflg;
    }
    

    Here is a version of your original code with the diagnostic option added:

    #include <stdio.h>
    #include <time.h>
    #include <stdlib.h>
    #include <string.h>
    #include <pthread.h>
    #include <unistd.h>
    
    int opt_diag;
    
    #define NUM_ALM 5
    #define ERROR   -1
    #define OK      0
    
    //even IDs = alarm active
    //odd IDs  = alarm clear
    enum alarmid {
        BFD_ACT = 0x02,
        BFD_CLR = 0x03,
        LOS_ACT = 0x0C
    };
    typedef struct alarm_s {
        long timestamp;
        int alarmid;
        int arg1;
        int arg2;
    } alarm_t;
    int alarm_add(int id, int arg1, int arg2);
    int next_alarm_read(alarm_t * res);
    void *alarm_reader(void *arg);
    
    static alarm_t *roller;
    pthread_cond_t cv;
    pthread_mutex_t mutex;
    
    #define prtf(_fmt...) \
        do { \
            if (opt_diag) \
                break; \
            printf(_fmt); \
        } while (0)
    
    int
    main(int argc,char **argv)
    {
        int i = 0;
        char *cp;
        pthread_t reader;
        int ret;
    
        --argc;
        ++argv;
    
        for (;  argc > 0;  --argc, ++argv) {
            cp = *argv;
            if (*cp != '-')
                break;
    
            switch (cp[1]) {
            case 'D':
                cp += 2;
                opt_diag = (*cp != 0) ? atoi(cp) : 10000000;
                break;
            }
        }
    
        roller = calloc(NUM_ALM, sizeof(alarm_t));
        printf("allocated memory: %lukB\n", (sizeof(alarm_t) * NUM_ALM) / 1024);
    
        if (! opt_diag) {
            for (i = 1; i < NUM_ALM; i++) {
                alarm_add(LOS_ACT, i, 0);
            }
        }
    
        ret = pthread_create(&reader, NULL, alarm_reader, NULL);
        if (ret) {
            printf("Error - pthread_create() return code: %d\n", ret);
            return ERROR;
        }
    
        if (opt_diag) {
            for (i = 1; i < opt_diag; i++) {
                alarm_add(LOS_ACT, i, 0);
            }
        }
        else {
            sleep(1);
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_CLR, 8, 0);
            alarm_add(BFD_ACT, 8, 0);
        }
    
        pthread_join(reader, NULL);
    }
    
    void *
    alarm_reader(void *arg)
    {
        static alarm_t dat = { 0 };
        int expval = 1;
        int err = 0;
    
        while (err <= 2) {
            if (next_alarm_read(&dat) == OK) {
                prtf("read alarm id %d, arg1 %d,arg2 %d\n", dat.alarmid, dat.arg1, dat.arg2);
                if (opt_diag) {
                    if (dat.arg1 != expval) {
                        printf("expected: %d got %d\n",expval,dat.arg1);
                        exit(1);
                    }
                    ++expval;
                }
            }
            else {
                prtf("alarm_reader() next_alarm_read() returned ERROR, wait\n");
                pthread_mutex_lock(&mutex);
                pthread_cond_wait(&cv, &mutex);
                pthread_mutex_unlock(&mutex);
    
                err++;
            }
        }
        printf("alarm_reader exit!\n");
    
        return (void *) 0;
    }
    
    int
    alarm_add(int id, int arg1, int arg2)
    {
        static int i = 0;
        alarm_t dat = { 0 };
        if (i < NUM_ALM) {
            dat.timestamp = time(NULL);
            dat.alarmid = id;
            dat.arg1 = arg1;
            dat.arg2 = arg2;
    
            if (&roller[i]) {
                memcpy(&roller[i], &dat, sizeof(alarm_t));
                if (i + 1 < NUM_ALM)
                    roller[i + 1].alarmid = 0;
                else
                    roller[0].alarmid = 0;
                pthread_cond_signal(&cv);
                prtf("added id %d, arg1 %d, arg2 %d @%d\n", roller[i].alarmid, roller[i].arg1, roller[i].arg2, i);
                i++;
            }
        }
        else {
            i = 0;
        }
        return 0;
    }
    
    int
    next_alarm_read(alarm_t * res)
    {
        static int i = 0;
        //static long prev_time = 0;
    
        if (!res)
            return ERROR;
    
        if (i < NUM_ALM) {
            if (roller[i].alarmid != 0) {
                prtf("next_alarm_read() reading @%d\n", i);
                res->timestamp = roller[i].timestamp;
                res->alarmid = roller[i].alarmid;
                res->arg1 = roller[i].arg1;
                res->arg2 = roller[i].arg2;
                //prev_time = roller[i].timestamp;
                i++;
            }
            else {
                prtf("next_alarm_read() @%d is %d,return ERROR\n", i, roller[i].alarmid);
    
                return ERROR;
            }
        }
        else {
            i = 0;
        }
        return OK;
    }