Search code examples
c++cmultithreadingdoubly-linked-listcircular-buffer

C Simple RingBuffer - Multithreading - Finding Critical Sections


so I wrote a simple C Ring Buffer that I'm now testing using multiple threads and I'm having a hard time trying to get the code to fail so that I can identify critical sections.

Note: The code is in C, but i'm testing it in C++ files because its easier to create threads mutexes etc.

Header File:

#ifndef _C_TEST_H_
#define _C_TEST_H_

#include <stdio.h>
#include <mutex>

///////////////////////////////////////////////////////////////////////////////
// Defines and macros
///////////////////////////////////////////////////////////////////////////////
#ifndef __cplusplus
typedef enum { false, true } bool;
#endif

#define RING_BUFFER_SIZE 2000

///////////////////////////////////////////////////////////////////////////////
// Structures, Enumerations, Typedefs
///////////////////////////////////////////////////////////////////////////////

typedef struct Node
{
    int val;
    struct Node *next;
    struct Node *previous;
} Node_T;

typedef enum RB_ERC
{
    RB_ERC_NO_ERROR,
    RB_ERC_NULL_PTR,
    RB_ERC_UNDERFLOW,
    RB_ERC_OVERFLOW
} RB_ERC_T;

typedef enum RB_HANDLE_OVERFLOW
{
    RB_DECIMATE,
    RB_IGNORE_AND_RETURN_ERROR
} RB_HANDLE_OVERFLOW_T;

typedef enum RB_READ_MODE
{
    RB_FIFO,
    RB_LIFO
} RB_READ_MODE_T;


typedef struct RingBuffer
{
    int curSize;
    RB_HANDLE_OVERFLOW_T handleOverflow;
    struct Node *Write;
    struct Node *Read;
    Node_T buffer[RING_BUFFER_SIZE];
} RING_BUFFER_T;


///////////////////////////////////////////////////////////////////////////////
// Prototypes
///////////////////////////////////////////////////////////////////////////////

#ifdef __cplusplus
extern "C" {
#endif

RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T ifOverflow_);

//Return true if the queue has no elements; false if there are elements on the queue
bool RB_IsEmpty(RING_BUFFER_T *rb_);

//Return true if the queue is full; false if there are seats available
bool RB_IsFull(RING_BUFFER_T *rb_);

//Write N elements (length of the array) to the queue
//Note: array values will be read from element 0 to array length
RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_);

//Write 1 element
RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_);

//Dequeue and read N elements (length of the array) into an array
RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_);

//Dequeue and read 1 element
RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_);

#ifdef __cplusplus
}
#endif


#endif //_C_TEST_H_

Source:

#include "CTest.h"

static std::mutex m;

RB_ERC_T RB_InitRingBuffer(RING_BUFFER_T *rb_, RB_HANDLE_OVERFLOW_T handleOverflow_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    int i;

    if(rb_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }

    //Initialize this instance of the ring buffer
    //Both the read/write pointers should start at the same location
    rb_->curSize            = 0;
    rb_->Read               = &rb_->buffer[0];
    rb_->Write              = &rb_->buffer[0];
    rb_->handleOverflow     = handleOverflow_;

    //Build the circular doubly-linked list
    for(i = 0; i < RING_BUFFER_SIZE; i++)
    {
        rb_->buffer[i].val = 0;
        if(i == 0)
        {
            //Sentinal Node found. Point the first node to the last element of the array
            rb_->buffer[i].previous = &rb_->buffer[(RING_BUFFER_SIZE - 1)];
            rb_->buffer[i].next     = &rb_->buffer[i + 1];
        }
        else if(i < (RING_BUFFER_SIZE - 1) )
        {
            rb_->buffer[i].next     = &rb_->buffer[i + 1];
            rb_->buffer[i].previous = &rb_->buffer[i - 1];
        }
        else
        {
            //Sentinal node found. Reached the last element in the array; Point the sentinal
            //node to the first element in the array to create a circular linked list.
            rb_->buffer[i].next     = &rb_->buffer[0];
            rb_->buffer[i].previous = &rb_->buffer[i - 1];
        }
    }
    //m.unlock();
    return erc;
}

bool RB_IsEmpty(RING_BUFFER_T *rb_)
{
    //m.lock();
    //Note: assume rb is valid.

    if(rb_->curSize == 0)
    {
        return true;
    }
    else
    {
        return false;
    }
    //m.unlock();
}

bool RB_IsFull(RING_BUFFER_T *rb_)
{
    //m.lock();
    //Note: assume rb is valid.

    if(rb_->curSize == RING_BUFFER_SIZE)
    {
        return true;
    }
    else
    {
        return false;
    }
    //m.unlock();
}


RB_ERC_T RB_WriteArray(RING_BUFFER_T *rb_, int values_[], int length_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;
    int i;

    if(rb_ == 0 || values_ == 0 || length_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }

    switch(rb_->handleOverflow)
    {
        //Increment through the array and enqueue

        //If attempting to write more elements than are available on the queue
        //Decimate                  - overwrite old data
        //Ignore and return error   - Don't write any data and throw an error
        case RB_DECIMATE:
            for(i = 0; i < length_; i++)
            {
                RB_Write(rb_, values_[i] );
            }
            break;
        default:
        case RB_IGNORE_AND_RETURN_ERROR:
        {
            int numSeatsAvailable = (RING_BUFFER_SIZE - rb_->curSize);
            if( length_ <= numSeatsAvailable )
            {
                //Increment through the array and enqueue
                for(i = 0; i < length_; i++)
                {
                    RB_Write(rb_, values_[i] );
                }
            }
            else
            {
                //Attempted to write more elements than are avaialable on the queue
                erc = RB_ERC_OVERFLOW;
            }
        }
            break;
    }
    //m.unlock();
    return erc;
}

RB_ERC_T RB_Write(RING_BUFFER_T *rb_, int val_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;

    if(rb_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }

    if( !RB_IsFull(rb_) )
    {
        //Write the value to the current location, then increment the write pointer
        //so that the write pointer is always pointing 1 element ahead of the queue
        rb_->Write->val = val_;
        rb_->Write      = rb_->Write->next;

        rb_->curSize++;
    }
    else
    {
        //Overflow
        switch(rb_->handleOverflow)
        {
            case RB_DECIMATE:
                //Set the value and increment both the read/write pointers
                rb_->Write->val = val_;
                rb_->Write      = rb_->Write->next;
                rb_->Read       = rb_->Read->next;
                break;
            default:
            case RB_IGNORE_AND_RETURN_ERROR:
                erc = RB_ERC_OVERFLOW;
                break;
        }
    }
    //m.unlock();
    return erc;
}



RB_ERC_T RB_ReadArray(RING_BUFFER_T *rb_, int values_[], int length_, RB_READ_MODE_T readMode_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;

    if(values_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }

    //Verify that the amount of data to be read is actually available on the queue
    if( length_ <= rb_->curSize )
    {
        //Increment through the array and dequeue
        int i;
        for(i = 0; i < length_; i++)
        {
            //Note: Error conditions have already been checked. Skip the ERC check
            (void) RB_Read(rb_, &values_[i], readMode_);
        }
    }
    else
    {
        //Attempted to read more data than is available on the queue
        erc = RB_ERC_UNDERFLOW;
    }
    //m.unlock();
    return erc;
}


RB_ERC_T RB_Read(RING_BUFFER_T *rb_, int *readVal_, RB_READ_MODE_T readMode_)
{
    //m.lock();
    RB_ERC_T erc = RB_ERC_NO_ERROR;

    if(rb_ == 0 || readVal_ == 0)
    {
        return RB_ERC_NULL_PTR;
    }

    if( !RB_IsEmpty(rb_) )
    {
        switch(readMode_)
        {
            case RB_LIFO:
                //Use the head (Write) to read the most recently written value (newest data)

                //Note: The write pointer is always pointing 1 position ahead of the current queue.
                rb_->Write = rb_->Write->previous;      //Decrement write pointer

                //Read the data
                *readVal_       = rb_->Write->val;
                rb_->Write->val = 0;                    //Reset read values to 0
                break;
            default:
            case RB_FIFO:
                *readVal_       = rb_->Read->val;
                rb_->Read->val  = 0;                    //Reset read values to 0

                rb_->Read       = rb_->Read->next;      //Increment read pointer
                break;
        }

        rb_->curSize--;
    }
    else
    {
        //Attempted to read more data but there is no data available on the queue
        erc = RB_ERC_UNDERFLOW;
    }
    //m.unlock();
    return erc;
}

Main CPP using for tests:

#include "CTest.h"
#include <iostream>
#include "windows.h"

#include <thread>
using namespace std;

static RING_BUFFER_T test1;

const int dataSize = 300;
const int dataSizeout = 1000;
int sharedValue = 0;
static std::mutex m;


void function1()
{
    int data[dataSize];
    RB_ERC_T erc = RB_ERC_NO_ERROR;

    for (int i = 0; i < dataSizeout; i++)
    {
        erc = RB_Write(&test1, i);
        if (erc != RB_ERC_NO_ERROR)
        {
            printf("Count down errrror %d\n", erc);
        }
    }

    //RB_WriteArray(&test1, data, dataSize);
}


void function2()
{
    RB_ERC_T erc = RB_ERC_NO_ERROR;

    for (int i = 0; i > -dataSizeout; i--)
    {
        erc = RB_Write(&test1, i);
        if (erc != RB_ERC_NO_ERROR)
        {
            printf("Count down errrror %d\n", erc);
        }
    }

}

int main()
{
RB_InitRingBuffer(&test1, RB_DECIMATE);

    thread p1(function1);
    //Sleep(1000);
    thread p2(function2);
    p1.join();
    p2.join();

    //Read out 5 at a time
    int out;
    int cnt = 0;

    while(cnt < (2 * dataSizeout) )
    {
        if (RB_Read(&test1, &out, RB_LIFO) == RB_ERC_NO_ERROR)
        {
            printf("out[%d] = %d\n", cnt, out);

            cnt += 1;
        }
    }



    system("Pause");

    return 0;
}

I'm thinking that everything in the main RING_BUFFER_T instance would be shared variables, so everywhere they are used, which is pretty much everywhere, they would have to be enclosed in mutexes.

typedef struct RingBuffer
    {
        int curSize;
        RB_HANDLE_OVERFLOW_T handleOverflow;
        struct Node *Write;
        struct Node *Read;
        Node_T buffer[RING_BUFFER_SIZE];
    } RING_BUFFER_T;

I suppose NODE_T would be as well, but only for initialization. Am I wrong or shouldn't the elements being stuffed in the ring buffer be placed out of order, since there is no mutex being used right now?


Solution

  • Thou Shalt Not expose the functions RB_IsEmpty and RB_IsFull as the return values may be invalid immediately. If you only call them from within read/write there is no need to do protection within that functions.

    Typically you must protect your struct within the externally exposed read and write functions from the first access to the last access. There is no need to protect parameter checking.

    You shall not double lock. Do not call RB_Read from RB_ReadArray. Provide an internal read function used by both. Same for the write functions.