I implemented my own circular buffer. I want to use two threads to test it. One thread continuously writing to the buffer while the other continuously reading it. When certain amount of data are read, print out the benchmark.
The purpose of this circular buffer is write and read never access the same memory so there won't incur a race.
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
if (buf -> valid_items >= buf -> max_items) {
// buffer full
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
if (isEmpty(buf)) {
// buffer empty
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
if (push(buf, hash) == 0) {
write_count++;
//printf("put %lu items into the buffer. valid_items: %d\n", write_count, buf -> valid_items);
}
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
if (pop(buf, new_buf) == 0) {
read_count++;
//printf("pop %lu items from the buffer. valid_items: %d\n", read_count, buf -> valid_items);
}
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
It is very weird that the program never terminates if I left the "printf" in function "write_hash" and "read_hash" commented. If I uncommented these two "printf"s, the program will print out all push and pop info till the end and eventually the program exit successfully.
Please help me to find if it's because something wrong with my circular buffer implementation, or somewhere else.
Luke, like EOF said, you're going to have race conditions in your code (probably several). Here are a few thoughts I had when I saw your code:
When threads share memory, you need to have protections on that memory so that only one thread can access it at a time. You can do this with a mutex (see pthread_mutex_lock and pthread_mutex_unlock).
Also, I noticed that your if statements in your read and write threads check the result of push and pop to see if a character was actually pushed or popped. While that might work, you should probably use semaphores to synchronize your threads instead. Here's what I mean by that: When your write thread writes to the buffer, you should have something like sem_post(&buff_count_sem);
. Then, before your read thread reads a character, you should have sem_wait(&buff_count_sem);
so that you know there is at least one character in the buffer.
The same principle applies to the read thread. I suggest the read thread having sem_post(&space_left_sem)
just after popping a byte from the buffer and the write thread having sem_wait(&space_left_sem)
just before pushing to the buffer to ensure that there is room in the buffer for the byte you are trying to write.
Here are my suggested changes to your code (not tested):
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <time.h>
#include <unistd.h>
#define MAX_NUM_ITEM 10000
#define HASH_SIZE 32
long write_count = 0;
long read_count = 0;
pthread_mutex_t buff_mutex;
sem_t buff_count_sem;
sem_t space_left_sem;
struct cBuf {
int first;
int last;
int max_items;
int item_size;
int valid_items;
unsigned char *buffer;
};
void init_cBuf(struct cBuf *buf, int max_items, int item_size) {
buf -> first = 0;
buf -> last = 0;
buf -> max_items = max_items;
buf -> item_size = item_size;
buf -> valid_items = 0;
buf -> buffer = calloc(max_items, item_size);
return;
}
int isEmpty(struct cBuf *buf) {
if (buf -> valid_items == 0) {
return 1;
}
else {
return 0;
}
}
int push(struct cBuf *buf, unsigned char *data) {
pthread_mutex_lock(&buff_mutex);
if (buf -> valid_items >= buf -> max_items) {
// buffer full
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// push data into the buffer
memcpy(buf -> buffer + (buf -> last) * (buf -> item_size), data, buf -> item_size);
// update cBuf info
buf -> valid_items++;
buf -> last = (buf -> last + 1) % (buf -> max_items);
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
int pop(struct cBuf *buf, unsigned char *new_buf) {
phthread_mutex_lock(&buff_mutex);
if (isEmpty(buf)) {
// buffer empty
pthread_mutex_unlock(&buff_mutex);
return -1;
}
else {
// read data
memcpy(new_buf, buf -> buffer + (buf -> first) * (buf -> item_size), buf -> item_size);
// update cBuf info
buf -> first = (buf -> first + 1) % (buf -> max_items);
buf -> valid_items--;
pthread_mutex_unlock(&buff_mutex);
return 0;
}
}
void *write_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
while (1) {
unsigned char *hash = malloc(HASH_SIZE); // for simplicity I just create some data with 32-byte.
sem_wait(&space_left_sem);
push(buf, hash);
write_count++;
sem_post(&buff_count_sem);
free(hash);
if (write_count == MAX_NUM_ITEM) {
break;
}
}
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total write = %lu\n\n", write_count);
return NULL;
}
void *read_hash(void *ptr) {
struct cBuf *buf = (struct cBuf *)(ptr);
unsigned char *new_buf = malloc(HASH_SIZE);
while (1) {
sem_wait(&buff_count_sem);
pop(buf, new_buf);
read_count++;
sem_post(&space_left_sem);
if (read_count == MAX_NUM_ITEM) {
break;
}
}
free(new_buf);
printf (" thread id = %lu\n", (long unsigned) (pthread_self ()));
printf (" total read = %lu\n\n", read_count);
}
int main(int argc, char const *argv[]) {
struct cBuf buf;
init_cBuf(&buf, 200, HASH_SIZE);
pthread_t write_thd, read_thd;
pthread_mutex_init(&buff_mutex);
sem_init(&buff_count_sem, 0, 0); // the last parameter is the initial value - initially 0 if no data is in the buffer
sem_init(&space_left_sem, 0, buf.max_items);
double diff = 0.0, t1 = 0.0, t2 = 0.0;
t1 = clock ();
pthread_create(&read_thd, NULL, read_hash, &buf);
pthread_create(&write_thd, NULL, write_hash, &buf);
pthread_join(write_thd, NULL);
pthread_join(read_thd, NULL);
t2 = clock ();
diff = (double)((t2 - t1) / CLOCKS_PER_SEC);
printf ("----------------\nTotal time: %lf second\n", diff);
printf ("Total write: %lu\n", write_count);
printf ("write per-second: %lf\n\n", write_count / diff);
return 0;
}
I suggest reading up on how to properly use threads.
UPDATE: 1. Fixed a typo. 2. You'll also want to have mutex_lock and mutex_unlock in isEmpty and any other functions that you write for your circular buffers where multiple threads might access the same buffer.